phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [1/2] phoenix git commit: PHOENIX-1954 Reserve chunks of numbers for a sequence (Jan Fernando)
Date Fri, 10 Jul 2015 05:38:13 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.0 cf803342a -> a85063e4c


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a85063e4/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 69520b0..a9f455a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -135,6 +135,7 @@ import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.ReadOnlyTableException;
 import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.Sequence;
+import org.apache.phoenix.schema.SequenceAllocation;
 import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
@@ -2272,8 +2273,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
      * Verifies that sequences exist and reserves values for them if reserveValues is true
      */
     @Override
-    public void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[]
values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException {
-        incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, action);
+    public void validateSequences(List<SequenceAllocation> sequenceAllocations, long
timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException
{
+        incrementSequenceValues(sequenceAllocations, timestamp, values, exceptions, action);
     }
 
     /**
@@ -2286,14 +2287,15 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
      *
      */
     @Override
-    public void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp,
long[] values, SQLException[] exceptions) throws SQLException {
-        incrementSequenceValues(sequenceKeys, timestamp, values, exceptions, Sequence.ValueOp.INCREMENT_SEQUENCE);
+    public void incrementSequences(List<SequenceAllocation> sequenceAllocations, long
timestamp, long[] values, SQLException[] exceptions) throws SQLException {
+        incrementSequenceValues(sequenceAllocations, timestamp, values, exceptions, Sequence.ValueOp.INCREMENT_SEQUENCE);
     }
 
     @SuppressWarnings("deprecation")
-    private void incrementSequenceValues(List<SequenceKey> keys, long timestamp, long[]
values, SQLException[] exceptions, Sequence.ValueOp op) throws SQLException {
-        List<Sequence> sequences = Lists.newArrayListWithExpectedSize(keys.size());
-        for (SequenceKey key : keys) {
+    private void incrementSequenceValues(List<SequenceAllocation> sequenceAllocations,
long timestamp, long[] values, SQLException[] exceptions, Sequence.ValueOp op) throws SQLException
{
+        List<Sequence> sequences = Lists.newArrayListWithExpectedSize(sequenceAllocations.size());
+        for (SequenceAllocation sequenceAllocation : sequenceAllocations) {
+            SequenceKey key = sequenceAllocation.getSequenceKey();
             Sequence newSequences = new Sequence(key);
             Sequence sequence = sequenceMap.putIfAbsent(key, newSequences);
             if (sequence == null) {
@@ -2312,11 +2314,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
             for (int i = 0; i < sequences.size(); i++) {
                 Sequence sequence = sequences.get(i);
                 try {
-                    values[i] = sequence.incrementValue(timestamp, op);
+                    values[i] = sequence.incrementValue(timestamp, op, sequenceAllocations.get(i).getNumAllocations());
                 } catch (EmptySequenceCacheException e) {
                     indexes[toIncrementList.size()] = i;
                     toIncrementList.add(sequence);
-                    Increment inc = sequence.newIncrement(timestamp, op);
+                    Increment inc = sequence.newIncrement(timestamp, op, sequenceAllocations.get(i).getNumAllocations());
                     incrementBatch.add(inc);
                 } catch (SQLException e) {
                     exceptions[i] = e;
@@ -2355,7 +2357,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
                 Sequence sequence = toIncrementList.get(i);
                 Result result = (Result)resultObjects[i];
                 try {
-                    values[indexes[i]] = sequence.incrementValue(result, op);
+                    long numToAllocate = Bytes.toLong(incrementBatch.get(i).getAttribute(SequenceRegionObserver.NUM_TO_ALLOCATE));
+                    values[indexes[i]] = sequence.incrementValue(result, op, numToAllocate);
                 } catch (SQLException e) {
                     exceptions[indexes[i]] = e;
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a85063e4/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index 4d582be..3fa0c1e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -66,6 +66,7 @@ import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.Sequence;
+import org.apache.phoenix.schema.SequenceAllocation;
 import org.apache.phoenix.schema.SequenceAlreadyExistsException;
 import org.apache.phoenix.schema.SequenceInfo;
 import org.apache.phoenix.schema.SequenceKey;
@@ -385,13 +386,13 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices
imple
     }
 
     @Override
-    public void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[]
values,
-            SQLException[] exceptions, Sequence.ValueOp action) throws SQLException {
+    public void validateSequences(List<SequenceAllocation> sequenceAllocations, long
timestamp,
+            long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException
{
         int i = 0;
-        for (SequenceKey key : sequenceKeys) {
-            SequenceInfo info = sequenceMap.get(key);
+        for (SequenceAllocation sequenceAllocation : sequenceAllocations) {
+            SequenceInfo info = sequenceMap.get(sequenceAllocation.getSequenceKey());
             if (info == null) {
-                exceptions[i] = new SequenceNotFoundException(key.getSchemaName(), key.getSequenceName());
+                exceptions[i] = new SequenceNotFoundException(sequenceAllocation.getSequenceKey().getSchemaName(),
sequenceAllocation.getSequenceKey().getSequenceName());
             } else {
                 values[i] = info.sequenceValue;          
             }
@@ -400,10 +401,11 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices
imple
     }
 
     @Override
-    public void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp,
long[] values,
-            SQLException[] exceptions) throws SQLException {
+    public void incrementSequences(List<SequenceAllocation> sequenceAllocations, long
timestamp,
+            long[] values, SQLException[] exceptions) throws SQLException {
         int i = 0;
-		for (SequenceKey key : sequenceKeys) {
+		for (SequenceAllocation sequenceAllocation : sequenceAllocations) {
+		    SequenceKey key = sequenceAllocation.getSequenceKey();
 			SequenceInfo info = sequenceMap.get(key);
 			if (info == null) {
 				exceptions[i] = new SequenceNotFoundException(
@@ -429,7 +431,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices
imple
         i = 0;
         for (SQLException e : exceptions) {
             if (e != null) {
-                sequenceMap.remove(sequenceKeys.get(i));
+                sequenceMap.remove(sequenceAllocations.get(i).getSequenceKey());
             }
             i++;
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a85063e4/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 2a98cd5..4153652 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -42,6 +42,7 @@ import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.Sequence;
+import org.apache.phoenix.schema.SequenceAllocation;
 import org.apache.phoenix.schema.SequenceKey;
 import org.apache.phoenix.schema.stats.PTableStats;
 
@@ -183,15 +184,15 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices
imple
     }
 
     @Override
-    public void validateSequences(List<SequenceKey> sequenceKeys, long timestamp, long[]
values,
-            SQLException[] exceptions, Sequence.ValueOp action) throws SQLException {
-        getDelegate().validateSequences(sequenceKeys, timestamp, values, exceptions, action);
+    public void validateSequences(List<SequenceAllocation> sequenceAllocations, long
timestamp,
+            long[] values, SQLException[] exceptions, Sequence.ValueOp action) throws SQLException
{
+        getDelegate().validateSequences(sequenceAllocations, timestamp, values, exceptions,
action);
     }
 
     @Override
-    public void incrementSequences(List<SequenceKey> sequenceKeys, long timestamp,
long[] values,
-            SQLException[] exceptions) throws SQLException {
-        getDelegate().incrementSequences(sequenceKeys, timestamp, values, exceptions);
+    public void incrementSequences(List<SequenceAllocation> sequenceAllocations, long
timestamp,
+            long[] values, SQLException[] exceptions) throws SQLException {
+        getDelegate().incrementSequences(sequenceAllocations, timestamp, values, exceptions);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a85063e4/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 9e74d2a..9d0c1aa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1255,7 +1255,7 @@ public class MetaDataClient {
                         dataTable.getTimeStamp());
                     long[] seqValues = new long[1];
                     SQLException[] sqlExceptions = new SQLException[1];
-                    connection.getQueryServices().incrementSequences(Collections.singletonList(key),
+                    connection.getQueryServices().incrementSequences(Collections.singletonList(new
SequenceAllocation(key, 1)),
                             Math.max(timestamp, dataTable.getTimeStamp()), seqValues, sqlExceptions);
                     if (sqlExceptions[0] != null) {
                         throw sqlExceptions[0];

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a85063e4/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
index aeba58b..adca5e8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/Sequence.java
@@ -62,7 +62,7 @@ import com.google.common.math.LongMath;
 public class Sequence {
     public static final int SUCCESS = 0;
     
-    public enum ValueOp {VALIDATE_SEQUENCE, RESERVE_SEQUENCE, INCREMENT_SEQUENCE};
+    public enum ValueOp {VALIDATE_SEQUENCE, INCREMENT_SEQUENCE};
     public enum MetaOp {CREATE_SEQUENCE, DROP_SEQUENCE, RETURN_SEQUENCE};
     
     // create empty Sequence key values used while created a sequence row
@@ -144,7 +144,7 @@ public class Sequence {
         return value.isDeleted ? null : value;
     }
     
-    private long increment(SequenceValue value, ValueOp op) throws SQLException {       
+    private long increment(SequenceValue value, ValueOp op, long numToAllocate) throws SQLException
{       
         boolean increasingSeq = value.incrementBy > 0 && op != ValueOp.VALIDATE_SEQUENCE;
         // check if the the sequence has already reached the min/max limit
         if (value.limitReached && op != ValueOp.VALIDATE_SEQUENCE) {           
@@ -165,7 +165,8 @@ public class Sequence {
             boolean overflowOrUnderflow=false;
             // advance currentValue while checking for overflow
             try {
-                value.currentValue = LongMath.checkedAdd(value.currentValue, value.incrementBy);
+                // advance by numToAllocate * the increment amount
+                value.currentValue = LongMath.checkedAdd(value.currentValue, numToAllocate
* value.incrementBy);
             } catch (ArithmeticException e) {
                 overflowOrUnderflow = true;
             }
@@ -180,18 +181,92 @@ public class Sequence {
         return returnValue;
     }
 
-    public long incrementValue(long timestamp, ValueOp op) throws SQLException {
+    public long incrementValue(long timestamp, ValueOp op, long numToAllocate) throws SQLException
{
         SequenceValue value = findSequenceValue(timestamp);
         if (value == null) {
             throw EMPTY_SEQUENCE_CACHE_EXCEPTION;
         }
-        if (value.currentValue == value.nextValue) {
+         
+        if (isSequenceCacheExhausted(numToAllocate, value)) {
             if (op == ValueOp.VALIDATE_SEQUENCE) {
                 return value.currentValue;
             }
             throw EMPTY_SEQUENCE_CACHE_EXCEPTION;
-        }    
-        return increment(value, op);
+        }
+        return increment(value, op, numToAllocate);
+    }
+    
+    /**
+     * This method first checks whether value.currentValue = value.nextValue, this check
is what 
+     * determines whether we need to refresh the cache when evaluating NEXT VALUE FOR. Once

+     * current value reaches the next value we know the cache is exhausted as we give sequence
+     * values out one at time. 
+     * 
+     * However for bulk allocations, evaluated by NEXT <n> VALUE FOR, we need a different
check
+     * @see isSequenceCacheExhaustedForBulkAllocation
+     * 
+     * Using the bulk allocation method for determining if the cache is exhausted for both
cases
+     * works in most of the cases, however when dealing with CYCLEs and overflow and underflow,
things
+     * break down due to things like sign changes that can happen if we overflow from a positive
to
+     * a negative number and vice versa. Therefore, leaving both checks in place. 
+     * 
+     */
+    private boolean isSequenceCacheExhausted(final long numToAllocate, final SequenceValue
value) throws SQLException {
+        return value.currentValue == value.nextValue || (SequenceUtil.isBulkAllocation(numToAllocate)
&& isSequenceCacheExhaustedForBulkAllocation(numToAllocate, value));
+    }
+
+    /**
+     * This method checks whether there are sufficient values in the SequenceValue
+     * cached on the client to allocate the requested number of slots. It handles
+     * decreasing and increasing sequences as well as any overflows or underflows
+     * encountered.
+     */
+    private boolean isSequenceCacheExhaustedForBulkAllocation(final long numToAllocate, final
SequenceValue value) throws SQLException {
+        long targetSequenceValue;
+        
+        performValidationForBulkAllocation(numToAllocate, value);
+        
+        try {
+            targetSequenceValue = LongMath.checkedAdd(value.currentValue, numToAllocate *
value.incrementBy);
+        } catch (ArithmeticException e) {
+            // Perform a CheckedAdd to make sure if over/underflow 
+            // We don't treat this as the cache being exhausted as the current value may
be valid in the case
+            // of no cycle, logic in increment() will take care of detecting we've hit the
limit of the sequence
+            return false;
+        }
+
+        if (value.incrementBy > 0) {
+            return targetSequenceValue > value.nextValue;
+        } else {
+            return  targetSequenceValue < value.nextValue;    
+        }
+    }
+    
+    /**
+     * @throws SQLException with the correct error code if sequence limit is reached with
+     * this request for allocation or we attempt to perform a bulk allocation on a sequence
+     * with cycles.
+     */
+    private void performValidationForBulkAllocation(final long numToAllocate, final SequenceValue
value)
+            throws SQLException {
+        boolean increasingSeq = value.incrementBy > 0 ? true : false;
+        
+        // We don't support Bulk Allocations on sequences that have the CYCLE flag set to
true
+        // Check for this here so we fail on expression evaluation and don't allow corner
case
+        // whereby a client requests less than cached number of slots on sequence with cycle
to succeed
+        if (value.cycle && !SequenceUtil.isCycleAllowed(numToAllocate)) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.NUM_SEQ_TO_ALLOCATE_NOT_SUPPORTED)
+            .setSchemaName(key.getSchemaName())
+            .setTableName(key.getSequenceName())
+            .build().buildException();
+        }
+        
+        if (SequenceUtil.checkIfLimitReached(value.currentValue, value.minValue, value.maxValue,
value.incrementBy, value.cacheSize, numToAllocate)) {
+            throw new SQLExceptionInfo.Builder(SequenceUtil.getLimitReachedErrorCode(increasingSeq))
+            .setSchemaName(key.getSchemaName())
+            .setTableName(key.getSequenceName())
+            .build().buildException();
+        }
     }
 
     public List<Append> newReturns() {
@@ -249,7 +324,7 @@ public class Sequence {
         return key;
     }
 
-    public long incrementValue(Result result, ValueOp op) throws SQLException {
+    public long incrementValue(Result result, ValueOp op, long numToAllocate) throws SQLException
{
         // In this case, we don't definitely know the timestamp of the deleted sequence,
         // but we know anything older is likely deleted. Worse case, we remove a sequence
         // from the cache that we shouldn't have which will cause a gap in sequence values.
@@ -270,19 +345,21 @@ public class Sequence {
                 .build().buildException();
         }
         // If we found the sequence, we update our cache with the new value
-        SequenceValue value = new SequenceValue(result, op);
+        SequenceValue value = new SequenceValue(result, op, numToAllocate);
         insertSequenceValue(value);
-        return increment(value, op);
+        return increment(value, op, numToAllocate);
     }
 
+
     @SuppressWarnings("deprecation")
-    public Increment newIncrement(long timestamp, Sequence.ValueOp action) {
+    public Increment newIncrement(long timestamp, Sequence.ValueOp action, long numToAllocate)
{
         Increment inc = new Increment(key.getKey());
         // It doesn't matter what we set the amount too - we always use the values we get
         // from the Get we do to prevent any race conditions. All columns that get added
         // are returned with their current value
         try {
             inc.setTimeRange(MetaDataProtocol.MIN_TABLE_TIMESTAMP, timestamp);
+            inc.setAttribute(SequenceRegionObserver.NUM_TO_ALLOCATE, Bytes.toBytes(numToAllocate));
         } catch (IOException e) {
             throw new RuntimeException(e); // Impossible
         }
@@ -413,7 +490,7 @@ public class Sequence {
             return this.incrementBy == 0;
         }
         
-        public SequenceValue(Result r, ValueOp op) {
+        public SequenceValue(Result r, ValueOp op, long numToAllocate) {
             KeyValue currentValueKV = getCurrentValueKV(r);
             KeyValue incrementByKV = getIncrementByKV(r);
             KeyValue cacheSizeKV = getCacheSizeKV(r);
@@ -429,8 +506,12 @@ public class Sequence {
             this.cycle = (Boolean) PBoolean.INSTANCE.toObject(cycleKV.getValueArray(), cycleKV.getValueOffset(),
cycleKV.getValueLength());
             this.limitReached = false;
             currentValue = nextValue;
+            
             if (op != ValueOp.VALIDATE_SEQUENCE) {
-                currentValue -= incrementBy * cacheSize;
+                // We can't just take the max of numToAllocate and cacheSize
+                // We need to handle a valid edgecase where a client requests bulk allocation
of 
+                // a number of slots that are less than cache size of the sequence
+                currentValue -= incrementBy * (SequenceUtil.isBulkAllocation(numToAllocate)
? numToAllocate : cacheSize);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a85063e4/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceAllocation.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceAllocation.java
b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceAllocation.java
new file mode 100644
index 0000000..afb4a20
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/SequenceAllocation.java
@@ -0,0 +1,48 @@
+package org.apache.phoenix.schema;
+
+/**
+ * A SequenceKey and the number of slots requested to be allocated for the sequence. 
+ * It binds these two together to allow operations such as sorting
+ * a Collection of SequenceKeys and at the same time preserving the associated requested
+ * number of slots to allocate.
+ * 
+ * This class delegates hashCode, equals and compareTo to @see{SequenceKey}.
+ *
+ */
+public class SequenceAllocation implements Comparable<SequenceAllocation> {
+    
+    private final SequenceKey sequenceKey;
+    private final long numAllocations;
+    
+    public SequenceAllocation(SequenceKey sequenceKey, long numAllocations) {
+        this.sequenceKey = sequenceKey;
+        this.numAllocations = numAllocations;
+    }
+    
+    
+    public SequenceKey getSequenceKey() {
+        return sequenceKey;
+    }
+
+
+    public long getNumAllocations() {
+        return numAllocations;
+    }
+
+
+    @Override
+    public int hashCode() {
+        return sequenceKey.hashCode();
+    }
+    
+    @Override
+    public boolean equals(Object obj) {
+        return sequenceKey.equals(obj);
+    }
+    
+    @Override
+    public int compareTo(SequenceAllocation that) {
+        return sequenceKey.compareTo(that.sequenceKey);
+    }
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a85063e4/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java
index f97d565..acf1864 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SequenceUtil.java
@@ -16,6 +16,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.schema.SequenceInfo;
 
+import com.google.common.base.Preconditions;
 import com.google.common.math.LongMath;
 
 /**
@@ -23,17 +24,24 @@ import com.google.common.math.LongMath;
  */
 public class SequenceUtil {
 
+    public static final long DEFAULT_NUM_SLOTS_TO_ALLOCATE = 1L; 
+    
     /**
-     * Returns the nextValue of a sequence 
-     * @throws SQLException if cycle is false and the sequence limit has been reached
+     * @return true if we limit of a sequence has been reached.
      */
     public static boolean checkIfLimitReached(long currentValue, long minValue, long maxValue,
-            long incrementBy, long cacheSize) throws SQLException {
+            long incrementBy, long cacheSize, long numToAllocate) {
         long nextValue = 0;
         boolean increasingSeq = incrementBy > 0 ? true : false;
         // advance currentValue while checking for overflow    
         try {
-            long incrementValue = LongMath.checkedMultiply(incrementBy, cacheSize);
+            long incrementValue;
+            if (isBulkAllocation(numToAllocate)) {
+                // For bulk allocation we increment independent of cache size
+                incrementValue = LongMath.checkedMultiply(incrementBy, numToAllocate);
+            } else {
+                incrementValue = LongMath.checkedMultiply(incrementBy, cacheSize);
+            }
             nextValue = LongMath.checkedAdd(currentValue, incrementValue);
         } catch (ArithmeticException e) {
             return true;
@@ -46,9 +54,28 @@ public class SequenceUtil {
         }
         return false;
     }
+
+    public static boolean checkIfLimitReached(long currentValue, long minValue, long maxValue,
+            long incrementBy, long cacheSize) throws SQLException {
+        return checkIfLimitReached(currentValue, minValue, maxValue, incrementBy, cacheSize,
DEFAULT_NUM_SLOTS_TO_ALLOCATE);
+    }
     
     public static boolean checkIfLimitReached(SequenceInfo info) throws SQLException {
-        return checkIfLimitReached(info.sequenceValue, info.minValue, info.maxValue, info.incrementBy,
info.cacheSize);
+        return checkIfLimitReached(info.sequenceValue, info.minValue, info.maxValue, info.incrementBy,
info.cacheSize, DEFAULT_NUM_SLOTS_TO_ALLOCATE);
+    }
+    
+    /**
+     * Returns true if the value of numToAllocate signals that a bulk allocation of sequence
slots
+     * was requested. Prevents proliferation of same comparison in many places throughout
the code.
+     */
+    public static boolean isBulkAllocation(long numToAllocate) {
+        Preconditions.checkArgument(numToAllocate > 0);
+        return numToAllocate > DEFAULT_NUM_SLOTS_TO_ALLOCATE;
+    }
+    
+    public static boolean isCycleAllowed(long numToAllocate) {
+        return !isBulkAllocation(numToAllocate);  
+        
     }
     
     /**
@@ -59,5 +86,15 @@ public class SequenceUtil {
         return new SQLExceptionInfo.Builder(code).setSchemaName(schemaName).setTableName(tableName)
                 .build().buildException();
     }
+    
+    /**
+     * Returns the correct instance of SQLExceptionCode when we detect a limit has been reached,
+     * depending upon whether a min or max value caused the limit to be exceeded.
+     */
+    public static SQLExceptionCode getLimitReachedErrorCode(boolean increasingSeq) {
+        SQLExceptionCode code = increasingSeq ? SQLExceptionCode.SEQUENCE_VAL_REACHED_MAX_VALUE
+                : SQLExceptionCode.SEQUENCE_VAL_REACHED_MIN_VALUE;
+        return code;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a85063e4/phoenix-core/src/test/java/org/apache/phoenix/schema/SequenceAllocationTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/SequenceAllocationTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/schema/SequenceAllocationTest.java
new file mode 100644
index 0000000..4a825f2
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/SequenceAllocationTest.java
@@ -0,0 +1,59 @@
+package org.apache.phoenix.schema;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class SequenceAllocationTest {
+
+    @Test
+    /**
+     * Validates that sorting a List of SequenceAllocation instances
+     * results in the same sort order as sorting SequenceKey instances.
+     */
+    public void testSortingSequenceAllocation() {
+        
+        // Arrange
+        SequenceKey sequenceKey1 = new SequenceKey(null, "seqalloc", "sequenceC",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+        SequenceKey sequenceKey2 = new SequenceKey(null, "seqalloc", "sequenceB",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+        SequenceKey sequenceKey3 = new SequenceKey(null, "seqalloc", "sequenceA",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+        List<SequenceKey> sequenceKeys = Lists.newArrayList(sequenceKey1, sequenceKey2,
sequenceKey3);
+        List<SequenceAllocation> sequenceAllocations = Lists.newArrayList(new SequenceAllocation(sequenceKey2,
1), new SequenceAllocation(sequenceKey1, 1), new SequenceAllocation(sequenceKey3, 1));
+        
+        // Act
+        Collections.sort(sequenceKeys);
+        Collections.sort(sequenceAllocations);
+        
+        // Assert
+        int i = 0;
+        for (SequenceKey sequenceKey : sequenceKeys) {
+            assertEquals(sequenceKey, sequenceAllocations.get(i).getSequenceKey());
+            i++;
+        }
+    }
+    
+    @Test
+    public void testSortingSequenceAllocationPreservesAllocations() {
+        
+        // Arrange
+        SequenceKey sequenceKeyC = new SequenceKey(null, "seqalloc", "sequenceC",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+        SequenceKey sequenceKeyB = new SequenceKey(null, "seqalloc", "sequenceB",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+        SequenceKey sequenceKeyA = new SequenceKey(null, "seqalloc", "sequenceA",QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS);
+        List<SequenceAllocation> sequenceAllocations = Lists.newArrayList(new SequenceAllocation(sequenceKeyB,
15), new SequenceAllocation(sequenceKeyC, 11), new SequenceAllocation(sequenceKeyA, 1000));
+        
+        // Act
+        Collections.sort(sequenceAllocations);
+        
+        // Assert
+        assertEquals("sequenceA",sequenceAllocations.get(0).getSequenceKey().getSequenceName());
+        assertEquals(1000,sequenceAllocations.get(0).getNumAllocations());
+        assertEquals(15,sequenceAllocations.get(1).getNumAllocations());
+        assertEquals(11,sequenceAllocations.get(2).getNumAllocations());
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a85063e4/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java
index f25a213..2abc482 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/SequenceUtilTest.java
@@ -62,4 +62,58 @@ public class SequenceUtilTest {
     public void testDescendingOverflowCycle() throws SQLException {
     	assertTrue(SequenceUtil.checkIfLimitReached(Long.MIN_VALUE, Long.MIN_VALUE, 0, -1/*
incrementBy */, CACHE_SIZE));
     }
+    
+    @Test
+    public void testBulkAllocationAscendingNextValueGreaterThanMax() throws SQLException
{
+        assertTrue(SequenceUtil.checkIfLimitReached(MAX_VALUE, MIN_VALUE, MAX_VALUE, 2/*
incrementBy */, CACHE_SIZE, 1));
+    }
+    
+    @Test
+    public void testBulkAllocationAscendingNextValueReachLimit() throws SQLException {
+        assertFalse(SequenceUtil.checkIfLimitReached(6, MIN_VALUE, MAX_VALUE, 2/* incrementBy
*/,  CACHE_SIZE, 2));
+    }
+
+    @Test
+    public void testBulkAllocationAscendingNextValueWithinLimit() throws SQLException {
+        assertFalse(SequenceUtil.checkIfLimitReached(5, MIN_VALUE, MAX_VALUE, 2/* incrementBy
*/, CACHE_SIZE, 2));
+
+    }
+    
+    @Test
+    public void testBulkAllocationAscendingOverflow() throws SQLException {
+        assertTrue(SequenceUtil.checkIfLimitReached(Long.MAX_VALUE, 0, Long.MAX_VALUE, 1/*
incrementBy */, CACHE_SIZE, 100));
+    }
+    
+    
+    @Test
+    public void testBulkAllocationDescendingNextValueLessThanMax() throws SQLException {
+        assertTrue(SequenceUtil.checkIfLimitReached(10, MIN_VALUE, MAX_VALUE, -2/* incrementBy
*/, CACHE_SIZE, 5));
+    }
+    
+    @Test
+    public void testBulkAllocationDescendingNextValueReachLimit() throws SQLException {
+        assertFalse(SequenceUtil.checkIfLimitReached(7, MIN_VALUE, MAX_VALUE, -2/* incrementBy
*/,  CACHE_SIZE, 3));
+    }
+
+    @Test
+    public void testBulkAllocationDescendingNextValueWithinLimit() throws SQLException {
+        assertFalse(SequenceUtil.checkIfLimitReached(8, MIN_VALUE, MAX_VALUE, -2/* incrementBy
*/, CACHE_SIZE, 2));
+
+    }
+
+    @Test
+    public void testBulkAllocationDescendingOverflowCycle() throws SQLException {
+        assertTrue(SequenceUtil.checkIfLimitReached(Long.MIN_VALUE, Long.MIN_VALUE, 0, -1/*
incrementBy */, CACHE_SIZE, 100));
+    }
+    
+    @Test
+    public void testIsCycleAllowedForBulkAllocation() {
+        assertFalse(SequenceUtil.isCycleAllowed(2));
+    }
+
+    @Test
+    public void testIsCycleAllowedForStandardAllocation() {
+        assertTrue(SequenceUtil.isCycleAllowed(1));
+    }
+    
 }


Mime
View raw message