asterixdb-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kiss...@apache.org
Subject incubator-asterixdb git commit: ASTERIXDB-1011: added flow control for merge policy See the design document here: https://cwiki.apache.org/confluence/display/ASTERIXDB/Flush-Operation+Flow+Control+For+Merge+Policy
Date Tue, 12 Apr 2016 22:48:26 GMT
Repository: incubator-asterixdb
Updated Branches:
  refs/heads/master d7d7affcc -> f3bcb73ce


ASTERIXDB-1011: added flow control for merge policy
See the design document here:
https://cwiki.apache.org/confluence/display/ASTERIXDB/Flush-Operation+Flow+Control+For+Merge+Policy

Change-Id: Ide99c022861f96cd60bc8f5795c4964ab02b3e14
Reviewed-on: https://asterix-gerrit.ics.uci.edu/795
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hubailmor@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/f3bcb73c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/f3bcb73c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/f3bcb73c

Branch: refs/heads/master
Commit: f3bcb73ce0d3f13cd09a509456a56fff49ef6fcb
Parents: d7d7aff
Author: Young-Seok Kim <kisskys@gmail.com>
Authored: Tue Apr 12 14:20:11 2016 -0700
Committer: Young-Seok Kim <kisskys@gmail.com>
Committed: Tue Apr 12 15:11:22 2016 -0700

----------------------------------------------------------------------
 .../context/CorrelatedPrefixMergePolicy.java    |  16 +-
 .../am/lsm/common/api/ILSMMergePolicy.java      |  31 ++-
 .../lsm/common/impls/ConstantMergePolicy.java   | 101 ++++++++-
 .../storage/am/lsm/common/impls/LSMHarness.java |  19 ++
 .../am/lsm/common/impls/NoMergePolicy.java      |   9 +-
 .../am/lsm/common/impls/PrefixMergePolicy.java  | 205 +++++++++++++++++--
 6 files changed, 344 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/f3bcb73c/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index a7374d3..3d112ef 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -46,13 +46,13 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy {
     private final int datasetID;
 
     public CorrelatedPrefixMergePolicy(IIndexLifecycleManager datasetLifecycleManager, int
datasetID) {
-        this.datasetLifecycleManager = (DatasetLifecycleManager)datasetLifecycleManager;
+        this.datasetLifecycleManager = (DatasetLifecycleManager) datasetLifecycleManager;
         this.datasetID = datasetID;
     }
 
     @Override
-    public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws
HyracksDataException,
-            IndexException {
+    public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested)
+            throws HyracksDataException, IndexException {
         // This merge policy will only look at primary indexes in order to evaluate if a
merge operation is needed. If it decides that
         // a merge operation is needed, then it will merge *all* the indexes that belong
to the dataset. The criteria to decide if a merge
         // is needed is the same as the one that is used in the prefix merge policy:
@@ -113,8 +113,8 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy {
                     // Reverse the components order back to its original order
                     Collections.reverse(mergableComponents);
 
-                    ILSMIndexAccessor accessor = lsmIndex.createAccessor(
-                            NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+                    ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
+                            NoOpOperationCallback.INSTANCE);
                     accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents);
                 }
                 break;
@@ -127,4 +127,10 @@ public class CorrelatedPrefixMergePolicy implements ILSMMergePolicy {
         maxMergableComponentSize = Long.parseLong(properties.get("max-mergable-component-size"));
         maxToleranceComponentCount = Integer.parseInt(properties.get("max-tolerance-component-count"));
     }
+
+    @Override
+    public boolean isMergeLagging(ILSMIndex index) {
+        //TODO implement properly according to the merge policy
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/f3bcb73c/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
index 841117b..c64fe63 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMMergePolicy.java
@@ -25,8 +25,35 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.common.api.IndexException;
 
 public interface ILSMMergePolicy {
-    public void diskComponentAdded(ILSMIndex index, boolean fullMergeIsRequested) throws
HyracksDataException,
-            IndexException;
+    public void diskComponentAdded(ILSMIndex index, boolean fullMergeIsRequested)
+            throws HyracksDataException, IndexException;
 
     public void configure(Map<String, String> properties);
+
+    /**
+     * This method is used for flush-operation flow control:
+     * When the space occupancy of the in-memory component exceeds a specified memory threshold,
+     * entries are flushed to disk. As entries accumulate on disk, the entries are periodically
+     * merged together subject to a merge policy that decides when and what to merge. A merge
+     * policy may impose a certain constraint such as a maximum number of mergable(merge-able)
+     * disk components in order to provide a reasonable query response time. Otherwise, the
query
+     * response time gets slower as the number of disk components increases.
+     * In order to avoid such an unexpected situation according to the merge policy, a way
to
+     * control the number of disk components is provided by introducing a new method,
+     * isMegeLagging() in ILSMMergePolicy interface. When flushing an in-memory component
is completed,
+     * the provided isMergeLagging() method is called to decide whether the memory budget
for the
+     * current flushed in-memory component should be available for the incoming updated(inserted/deleted/updated)
+     * entries or not. If the method returns true, i.e., the merge operation is lagged according
to
+     * the merge policy, the memory budget will not be made available for the incoming entries
by
+     * making the current flush operation thread wait until (ongoing) merge operation finishes.
+     * Therefore, this will effectively prevent the number of disk components from exceeding
+     * a threshold of the allowed number of disk components.
+     *
+     * @param index
+     * @return true if merge operation is lagged according to the implemented merge policy,
+     *         false otherwise.
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
+    public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException, IndexException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/f3bcb73c/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
index 93d6fd4..aad5bf4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ConstantMergePolicy.java
@@ -35,20 +35,20 @@ public class ConstantMergePolicy implements ILSMMergePolicy {
     private int numComponents;
 
     @Override
-    public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws
HyracksDataException,
-            IndexException {
+    public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested)
+            throws HyracksDataException, IndexException {
         List<ILSMComponent> immutableComponents = index.getImmutableComponents();
-        for (ILSMComponent c : immutableComponents) {
-            if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
-                return;
-            }
+
+        if (!areComponentsMergable(immutableComponents)) {
+            return;
         }
+
         if (fullMergeIsRequested) {
-            ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+            ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
             accessor.scheduleFullMerge(index.getIOOperationCallback());
         } else if (immutableComponents.size() >= numComponents) {
-            ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+            ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
             accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents);
         }
@@ -58,4 +58,89 @@ public class ConstantMergePolicy implements ILSMMergePolicy {
     public void configure(Map<String, String> properties) {
         numComponents = Integer.parseInt(properties.get("num-components"));
     }
+
+    @Override
+    public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException, IndexException
{
+        // see PrefixMergePolicy.isMergeLagging() for the rationale behind this code.
+
+        /**
+         * case 1.
+         * if totalImmutableCommponentCount < threshold,
+         * merge operation is not lagged ==> return false.
+         * case 2.
+         * if a) totalImmutableCommponentCount >= threshold && b) there is an
ongoing merge,
+         * merge operation is lagged. ==> return true.
+         * case 3. *SPECIAL CASE*
+         * if a) totalImmutableCommponentCount >= threshold && b) there is *NO*
ongoing merge,
+         * merge operation is lagged. ==> *schedule a merge operation* and then return
true.
+         * This is a special case that requires to schedule a merge operation.
+         * Otherwise, all flush operations will be hung.
+         * This case can happen in a following situation:
+         * The system may crash when
+         * condition 1) the mergableImmutableCommponentCount >= threshold and
+         * condition 2) merge operation is going on.
+         * After the system is recovered, still condition 1) is true.
+         * If there are flush operations in the same dataset partition after the recovery,
+         * all these flush operations may not proceed since there is no ongoing merge and
+         * there will be no new merge either in this situation.
+         */
+
+        List<ILSMComponent> immutableComponents = index.getImmutableComponents();
+        int totalImmutableComponentCount = immutableComponents.size();
+
+        // [case 1]
+        if (totalImmutableComponentCount < numComponents) {
+            return false;
+        }
+
+        boolean isMergeOngoing = isMergeOngoing(immutableComponents);
+
+        // here, implicitly (totalImmutableComponentCount >= numComponents) is true by
passing case 1.
+        if (isMergeOngoing) {
+            // [case 2]
+            return true;
+        } else {
+            // [case 3]
+            // schedule a merge operation after making sure that all components are mergable
+            if (!areComponentsMergable(immutableComponents)) {
+                throw new IllegalStateException();
+            }
+            ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
+                    NoOpOperationCallback.INSTANCE);
+            accessor.scheduleMerge(index.getIOOperationCallback(), immutableComponents);
+            return true;
+        }
+    }
+
+    /**
+     * checks whether all given components are mergable or not
+     *
+     * @param immutableComponents
+     * @return true if all components are mergable, false otherwise.
+     */
+    private boolean areComponentsMergable(List<ILSMComponent> immutableComponents)
{
+        for (ILSMComponent c : immutableComponents) {
+            if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * This method returns whether there is an ongoing merge operation or not by checking
+     * each component state of given components.
+     *
+     * @param immutableComponents
+     * @return true if there is an ongoing merge operation, false otherwise.
+     */
+    private boolean isMergeOngoing(List<ILSMComponent> immutableComponents) {
+        int size = immutableComponents.size();
+        for (int i = 0; i < size; i++) {
+            if (immutableComponents.get(i).getState() == ComponentState.READABLE_MERGING)
{
+                return true;
+            }
+        }
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/f3bcb73c/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index a19532f..b58cc29 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -200,6 +200,25 @@ public class LSMHarness implements ILSMHarness {
         try {
             synchronized (opTracker) {
                 try {
+
+                    /**
+                     * [flow control]
+                     * If merge operations are lagged according to the merge policy,
+                     * flushing in-memory components are hold until the merge operation catches
up.
+                     * See PrefixMergePolicy.isMergeLagging() for more details.
+                     */
+                    if (opType == LSMOperationType.FLUSH) {
+                        while (mergePolicy.isMergeLagging(lsmIndex)) {
+                            try {
+                                opTracker.wait();
+                            } catch (InterruptedException e) {
+                                //ignore
+                            }
+                        }
+                    } else if (opType == LSMOperationType.MERGE) {
+                        opTracker.notifyAll();
+                    }
+
                     int i = 0;
                     // First check if there is any action that is needed to be taken based
on the state of each component.
                     for (ILSMComponent c : ctx.getComponentHolder()) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/f3bcb73c/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
index 39ab815..86be9c8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicy.java
@@ -28,8 +28,8 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
 public class NoMergePolicy implements ILSMMergePolicy {
 
     @Override
-    public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws
HyracksDataException,
-            IndexException {
+    public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested)
+            throws HyracksDataException, IndexException {
         // Do nothing
     }
 
@@ -37,4 +37,9 @@ public class NoMergePolicy implements ILSMMergePolicy {
     public void configure(Map<String, String> properties) {
         // Do nothing
     }
+
+    @Override
+    public boolean isMergeLagging(ILSMIndex index) {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/f3bcb73c/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
index 5b8da53..36cb958 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/PrefixMergePolicy.java
@@ -39,27 +39,196 @@ public class PrefixMergePolicy implements ILSMMergePolicy {
     private int maxToleranceComponentCount;
 
     @Override
-    public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested) throws
HyracksDataException,
-            IndexException {
-        // 1.  Look at the candidate components for merging in oldest-first order.  If one
exists, identify the prefix of the sequence of
-        // all such components for which the sum of their sizes exceeds MaxMrgCompSz.  Schedule
a merge of those components into a new component.
-        // 2.  If a merge from 1 doesn't happen, see if the set of candidate components for
merging exceeds MaxTolCompCnt.  If so, schedule
-        // a merge all of the current candidates into a new single component.
+    public void diskComponentAdded(final ILSMIndex index, boolean fullMergeIsRequested)
+            throws HyracksDataException, IndexException {
+
         List<ILSMComponent> immutableComponents = new ArrayList<ILSMComponent>(index.getImmutableComponents());
-        // Reverse the components order so that we look at components from oldest to newest.
-        Collections.reverse(immutableComponents);
 
-        for (ILSMComponent c : immutableComponents) {
-            if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
-                return;
-            }
+        if (!areComponentsReadableWritableState(immutableComponents)) {
+            return;
         }
+
         if (fullMergeIsRequested) {
-            ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+            ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
                     NoOpOperationCallback.INSTANCE);
             accessor.scheduleFullMerge(index.getIOOperationCallback());
             return;
         }
+
+        scheduleMerge(index);
+    }
+
+    @Override
+    public void configure(Map<String, String> properties) {
+        maxMergableComponentSize = Long.parseLong(properties.get("max-mergable-component-size"));
+        maxToleranceComponentCount = Integer.parseInt(properties.get("max-tolerance-component-count"));
+    }
+
+    @Override
+    public boolean isMergeLagging(ILSMIndex index) throws HyracksDataException, IndexException
{
+
+        /**
+         * [for flow-control purpose]
+         * when merge operations are lagged, threads which flushed components will be blocked
+         * until merge operations catch up, i.e, until the number of mergable immutable components
<= maxToleranceComponentCount
+         * example:
+         * suppose that maxToleranceComponentCount = 3 and maxMergableComponentSize = 1GB
+         * The following shows a set of events occurred in time ti with a brief description.
+         * time
+         * t40: c32-1(1GB, RU) c38-33(192MB, RU) c39-39(32MB, RU) c40-40(32MB, RU)
+         * --> a thread which added c40-40 will trigger a merge including c38-33,c39-39,c40-40
+         * t41: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM) c40-40(32MB, RUM) c41-41(32MB,
RU)
+         * --> a thread which added c41-41 will not be blocked
+         * t42: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM) c40-40(32MB, RUM) c41-41(32MB,
RU) c42-42(32MB, RU)
+         * --> a thread which added c42-42 will not be blocked
+         * t43: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM) c40-40(32MB, RUM) c41-41(32MB,
RU) c42-42(32MB, RU) c43-43(32MB, RU)
+         * --> a thread which added c43-43 will not be blocked and will not trigger a
merge since there is an ongoing merge triggered in t1.
+         * t44: c32-1(1GB, RU) c38-33(192MB, RUM) c39-39(32MB, RUM) c40-40(32MB, RUM) c41-41(32MB,
RU) c42-42(32MB, RU) c43-43(32MB, RU) 'c44-44(32MB, RU)'
+         * --> a thread which will add c44-44 (even if the disk component is created,
but not added to index instance disk components yet)
+         * will be blocked until the number of RU components < maxToleranceComponentCount
+         * t45: c32-1(1GB, RU) *c40-33(256MB, RU)* c41-41(32MB, RU) c42-42(32MB, RU) c43-43(32MB,
RU) 'c44-44(32MB, RU)'
+         * --> a thread which completed the merge triggered in t1 added c40-33 and will
go ahead and trigger the next merge with c40-33,c41-41,c42-42,c43-43.
+         * Still, the blocked thread will continue being blocked and the c44-44 was not included
in the merge since it's not added yet.
+         * t46: c32-1(1GB, RU) c40-33(256MB, RUM) c41-41(32MB, RUM) c42-42(32MB, RUM) c43-43(32MB,
RUM) c44-44(32MB, RUM)
+         * --> the merge triggered in t45 is going on and the merge unblocked the blocked
thread, so c44-44 was added.
+         * t47: c32-1(1GB, RU) *c43-33(320MB, RU)* c44-44(32MB, RUM)
+         * --> a thread completed the merge triggered in t45 and added c43-33.
+         * t48: c32-1(1GB, RU) c43-33(320MB, RU) c44-44(32MB, RUM) c48-48(32MB, RU)
+         * --> a thread added c48-48 and will not be blocked and will trigger a merge
with c43-44, c44-44, c48-48.
+         * ... continues ...
+         * ----------------------------------------
+         * legend:
+         * For example, C32-1 represents a disk component, more specifically, disk component
name, where 32-1 represents a timestamp range from t1 to time t32.
+         * This means that the component C32-1 is a component resulting from a merge operation
that merged components C1-1 to C32-32.
+         * This also implies that if two timestamps in a component name are equal, the component
has not been merged yet after it was created.
+         * RU and RUM are possible state of disk components, where RU represents READABLE_UNWRITABLE
and RUM represents READABLE_UNWRITABLE_MERGING.
+         * Now, c32-1(1GB, RU) represents a disk component resulted from merging c1-1 ~ c32-32
and the component size is 1GB.
+         * ----------------------------------------
+         * The flow control allows at most maxToleranceComponentCount mergable components,
+         * where the mergable components are disk components whose i) state == RU and ii)
size < maxMergableComponentSize.
+         */
+
+        /**
+         * case 1.
+         * if mergableImmutableCommponentCount < threshold,
+         * merge operation is not lagged ==> return false.
+         * case 2.
+         * if a) mergableImmutableCommponentCount >= threshold && b) there is
an ongoing merge,
+         * merge operation is lagged. ==> return true.
+         * case 3. *SPECIAL CASE*
+         * if a) mergableImmutableCommponentCount >= threshold && b) there is
*NO* ongoing merge,
+         * merge operation is lagged. ==> *schedule a merge operation* and then return
true.
+         * This is a special case that requires to schedule a merge operation.
+         * Otherwise, all flush operations will be hung.
+         * This case can happen in a following situation:
+         * The system may crash when
+         * condition 1) the mergableImmutableCommponentCount >= threshold and
+         * condition 2) merge operation is going on.
+         * After the system is recovered, still condition 1) is true.
+         * If there are flush operations in the same dataset partition after the recovery,
+         * all these flush operations may not proceed since there is no ongoing merge and
+         * there will be no new merge either in this situation.
+         */
+
+        List<ILSMComponent> immutableComponents = index.getImmutableComponents();
+        int mergableImmutableComponentCount = getMergableImmutableComponentCount(immutableComponents);
+
+        // [case 1]
+        if (mergableImmutableComponentCount < maxToleranceComponentCount) {
+            return false;
+        }
+
+        boolean isMergeOngoing = isMergeOngoing(immutableComponents);
+
+        // here, implicitly (mergableImmutableComponentCount >= maxToleranceComponentCount)
is true by passing case 1.
+        if (isMergeOngoing) {
+            // [case 2]
+            return true;
+        } else {
+            // [case 3]
+            // make sure that all components are of READABLE_UNWRITABLE state.
+            if (!areComponentsReadableWritableState(immutableComponents)) {
+                throw new IllegalStateException();
+            }
+            // schedule a merge operation
+            boolean isMergeTriggered = scheduleMerge(index);
+            if (!isMergeTriggered) {
+                throw new IllegalStateException();
+            }
+            return true;
+        }
+    }
+
+    /**
+     * This method returns whether there is an ongoing merge operation or not by checking
+     * each component state of given components.
+     *
+     * @param immutableComponents
+     * @return true if there is an ongoing merge operation, false otherwise.
+     */
+    private boolean isMergeOngoing(List<ILSMComponent> immutableComponents) {
+        int size = immutableComponents.size();
+        for (int i = 0; i < size; i++) {
+            if (immutableComponents.get(i).getState() == ComponentState.READABLE_MERGING)
{
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * This method returns the number of mergable components among the given list
+     * of immutable components that are ordered from the latest component to order ones.
A caller
+     * need to make sure the order in the list.
+     *
+     * @param immutableComponents
+     * @return the number of mergable component
+     */
+    private int getMergableImmutableComponentCount(List<ILSMComponent> immutableComponents)
{
+        int count = 0;
+        for (ILSMComponent c : immutableComponents) {
+            long componentSize = ((AbstractDiskLSMComponent) c).getComponentSize();
+            //stop when the first non-mergable component is found.
+            if (c.getState() != ComponentState.READABLE_UNWRITABLE || componentSize >
maxMergableComponentSize) {
+                break;
+            }
+            ++count;
+        }
+        return count;
+    }
+
+    /**
+     * checks whether all given components are of READABLE_UNWRITABLE state
+     *
+     * @param immutableComponents
+     * @return true if all components are of READABLE_UNWRITABLE state, false otherwise.
+     */
+    private boolean areComponentsReadableWritableState(List<ILSMComponent> immutableComponents)
{
+        for (ILSMComponent c : immutableComponents) {
+            if (c.getState() != ComponentState.READABLE_UNWRITABLE) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * schedule a merge operation according to this prefix merge policy
+     *
+     * @param index
+     * @return true if merge is scheduled, false otherwise.
+     * @throws HyracksDataException
+     * @throws IndexException
+     */
+    private boolean scheduleMerge(final ILSMIndex index) throws HyracksDataException, IndexException
{
+        // 1.  Look at the candidate components for merging in oldest-first order.  If one
exists, identify the prefix of the sequence of
+        // all such components for which the sum of their sizes exceeds MaxMrgCompSz.  Schedule
a merge of those components into a new component.
+        // 2.  If a merge from 1 doesn't happen, see if the set of candidate components for
merging exceeds MaxTolCompCnt.  If so, schedule
+        // a merge all of the current candidates into a new single component.
+        List<ILSMComponent> immutableComponents = new ArrayList<ILSMComponent>(index.getImmutableComponents());
+        // Reverse the components order so that we look at components from oldest to newest.
+        Collections.reverse(immutableComponents);
+
         long totalSize = 0;
         int startIndex = -1;
         for (int i = 0; i < immutableComponents.size(); i++) {
@@ -80,17 +249,13 @@ public class PrefixMergePolicy implements ILSMMergePolicy {
                 }
                 // Reverse the components order back to its original order
                 Collections.reverse(mergableComponents);
-                ILSMIndexAccessor accessor = (ILSMIndexAccessor) index.createAccessor(NoOpOperationCallback.INSTANCE,
+                ILSMIndexAccessor accessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
                         NoOpOperationCallback.INSTANCE);
                 accessor.scheduleMerge(index.getIOOperationCallback(), mergableComponents);
-                break;
+                return true;
             }
         }
+        return false;
     }
 
-    @Override
-    public void configure(Map<String, String> properties) {
-        maxMergableComponentSize = Long.parseLong(properties.get("max-mergable-component-size"));
-        maxToleranceComponentCount = Integer.parseInt(properties.get("max-tolerance-component-count"));
-    }
 }


Mime
View raw message