cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [3/5] git commit: Fix infinite loop when paging queries with IN
Date Wed, 18 Dec 2013 10:24:04 GMT
Fix infinite loop when paging queries with IN

patch by slebresne; reviewed by iamaleksey for CASSANDRA-6464


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7c32ffbb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7c32ffbb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7c32ffbb

Branch: refs/heads/trunk
Commit: 7c32ffbbfae9959edc89ec5fcf9fced1b75c495b
Parents: f7255b5
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Wed Dec 18 11:18:30 2013 +0100
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Wed Dec 18 11:18:30 2013 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../service/pager/AbstractQueryPager.java       |  6 +-
 .../service/pager/MultiPartitionPager.java      | 89 +++++++++++++-------
 .../service/pager/NamesQueryPager.java          |  5 +-
 .../cassandra/service/pager/QueryPagers.java    |  5 +-
 .../service/pager/SinglePartitionPager.java     |  3 +
 .../service/pager/SliceQueryPager.java          |  5 ++
 7 files changed, 76 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7c32ffbb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 80ed481..5a124ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,7 @@
  * Expose a total memtable size metric for a CF (CASSANDRA-6391)
  * cqlsh: handle symlinks properly (CASSANDRA-6425)
  * Don't resubmit counter mutation runnables internally (CASSANDRA-6427)
+ * Fix potential infinite loop when paging query with IN (CASSANDRA-6464)
 Merged from 1.2:
  * Improved error message on bad properties in DDL queries (CASSANDRA-6453)
  * Randomize batchlog candidates selection (CASSANDRA-6481)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7c32ffbb/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index 9372665..6f6772c 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -40,9 +40,9 @@ abstract class AbstractQueryPager implements QueryPager
     protected final IDiskAtomFilter columnFilter;
     private final long timestamp;
 
-    private volatile int remaining;
-    private volatile boolean exhausted;
-    private volatile boolean lastWasRecorded;
+    private int remaining;
+    private boolean exhausted;
+    private boolean lastWasRecorded;
 
     protected AbstractQueryPager(ConsistencyLevel consistencyLevel,
                                  int toFetch,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7c32ffbb/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index 2615e9b..35d6752 100644
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -43,44 +43,72 @@ class MultiPartitionPager implements QueryPager
     private final SinglePartitionPager[] pagers;
     private final long timestamp;
 
-    private volatile int current;
-
-    MultiPartitionPager(List<ReadCommand> commands, ConsistencyLevel consistencyLevel,
boolean localQuery)
-    {
-        this(commands, consistencyLevel, localQuery, null);
-    }
+    private int remaining;
+    private int current;
 
     MultiPartitionPager(List<ReadCommand> commands, ConsistencyLevel consistencyLevel,
boolean localQuery, PagingState state)
     {
-        this.pagers = new SinglePartitionPager[commands.size()];
+        int i = 0;
+        // If it's not the beginning (state != null), we need to find where we were and skip
previous commands
+        // since they are done.
+        if (state != null)
+            for (; i < commands.size(); i++)
+                if (commands.get(i).key.equals(state.partitionKey))
+                    break;
+
+        if (i >= commands.size())
+        {
+            pagers = null;
+            timestamp = -1;
+            return;
+        }
+
+        pagers = new SinglePartitionPager[commands.size() - i];
+        // 'i' is on the first non exhausted pager for the previous page (or the first one)
+        pagers[0] = makePager(commands.get(i), consistencyLevel, localQuery, state);
+        timestamp = commands.get(i).timestamp;
 
-        long tstamp = -1;
-        for (int i = 0; i < commands.size(); i++)
+        // Following ones haven't been started yet
+        for (int j = i + 1; j < commands.size(); j++)
         {
-            ReadCommand command = commands.get(i);
-            if (tstamp == -1)
-                tstamp = command.timestamp;
-            else if (tstamp != command.timestamp)
+            ReadCommand command = commands.get(j);
+            if (command.timestamp != timestamp)
                 throw new IllegalArgumentException("All commands must have the same timestamp
or weird results may happen.");
-
-            PagingState tmpState = state != null && command.key.equals(state.partitionKey)
? state : null;
-            pagers[i] = command instanceof SliceFromReadCommand
-                      ? new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel,
localQuery, tmpState)
-                      : new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel,
localQuery, tmpState);
+            pagers[j - i] = makePager(command, consistencyLevel, localQuery, null);
         }
-        timestamp = tstamp;
+        remaining = state == null ? computeRemaining(pagers) : state.remaining;
+    }
+
+    private static SinglePartitionPager makePager(ReadCommand command, ConsistencyLevel consistencyLevel,
boolean localQuery, PagingState state)
+    {
+        return command instanceof SliceFromReadCommand
+             ? new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, localQuery,
state)
+             : new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, localQuery);
+    }
+
+    private static int computeRemaining(SinglePartitionPager[] pagers)
+    {
+        long remaining = 0;
+        for (SinglePartitionPager pager : pagers)
+            remaining += pager.maxRemaining();
+        return remaining > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)remaining;
     }
 
     public PagingState state()
     {
+        // Sets current to the first non-exhausted pager
+        if (isExhausted())
+            return null;
+
         PagingState state = pagers[current].state();
-        return state == null
-             ? null
-             : new PagingState(state.partitionKey, state.cellName, maxRemaining());
+        return new PagingState(pagers[current].key(), state == null ? null : state.cellName,
remaining);
     }
 
     public boolean isExhausted()
     {
+        if (remaining <= 0 || pagers == null)
+            return true;
+
         while (current < pagers.length)
         {
             if (!pagers[current].isExhausted())
@@ -93,18 +121,20 @@ class MultiPartitionPager implements QueryPager
 
     public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
     {
-        int remaining = pageSize;
         List<Row> result = new ArrayList<Row>();
 
-        while (!isExhausted() && remaining > 0)
+        int remainingThisQuery = pageSize;
+        while (remainingThisQuery > 0 && !isExhausted())
         {
-            // Exhausted also sets us on the first non-exhausted pager
-            List<Row> page = pagers[current].fetchPage(remaining);
+            // isExhausted has set us on the first non-exhausted pager
+            List<Row> page = pagers[current].fetchPage(remainingThisQuery);
             if (page.isEmpty())
                 continue;
 
             Row row = page.get(0);
-            remaining -= pagers[current].columnCounter().countAll(row.cf).live();
+            int fetched = pagers[current].columnCounter().countAll(row.cf).live();
+            remaining -= fetched;
+            remainingThisQuery -= fetched;
             result.add(row);
         }
 
@@ -113,10 +143,7 @@ class MultiPartitionPager implements QueryPager
 
     public int maxRemaining()
     {
-        int max = 0;
-        for (int i = current; i < pagers.length; i++)
-            max += pagers[i].maxRemaining();
-        return max;
+        return remaining;
     }
 
     public long timestamp()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7c32ffbb/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
index ede1e91..663db22 100644
--- a/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.service.pager;
 
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 
@@ -55,9 +56,9 @@ public class NamesQueryPager implements SinglePartitionPager
         this.localQuery = localQuery;
     }
 
-    NamesQueryPager(SliceByNamesReadCommand command, ConsistencyLevel consistencyLevel, boolean
localQuery, PagingState state)
+    public ByteBuffer key()
     {
-        this(command, consistencyLevel, localQuery);
+        return command.key;
     }
 
     public ColumnCounter columnCounter()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7c32ffbb/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
index 1601ff6..c353536 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
@@ -57,7 +57,8 @@ public class QueryPagers
         {
             List<ReadCommand> commands = ((Pageable.ReadCommands)command).commands;
 
-            int maxQueried = 0;
+            // Using long on purpose, as we could overflow otherwise
+            long maxQueried = 0;
             for (ReadCommand readCmd : commands)
                 maxQueried += maxQueried(readCmd);
 
@@ -78,7 +79,7 @@ public class QueryPagers
     private static QueryPager pager(ReadCommand command, ConsistencyLevel consistencyLevel,
boolean local, PagingState state)
     {
         if (command instanceof SliceByNamesReadCommand)
-            return new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel,
local, state);
+            return new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel,
local);
         else
             return new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, local,
state);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7c32ffbb/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
index 693a20e..51bbf90 100644
--- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.service.pager;
 
+import java.nio.ByteBuffer;
+
 import org.apache.cassandra.db.filter.ColumnCounter;
 
 /**
@@ -26,5 +28,6 @@ import org.apache.cassandra.db.filter.ColumnCounter;
  */
 public interface SinglePartitionPager extends QueryPager
 {
+    public ByteBuffer key();
     public ColumnCounter columnCounter();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7c32ffbb/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
index e3825a9..cd0c069 100644
--- a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java
@@ -54,6 +54,11 @@ public class SliceQueryPager extends AbstractQueryPager implements SinglePartiti
         }
     }
 
+    public ByteBuffer key()
+    {
+        return command.key;
+    }
+
     public PagingState state()
     {
         return lastReturned == null


Mime
View raw message