cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r887463 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: db/ColumnFamily.java db/Memtable.java service/CassandraServer.java service/StorageProxy.java utils/Pair.java
Date Sat, 05 Dec 2009 00:22:32 GMT
Author: jbellis
Date: Sat Dec  5 00:22:31 2009
New Revision: 887463

URL: http://svn.apache.org/viewvc?rev=887463&view=rev
Log:
r/m misguided attempt at optimizing merging range scan results from multiple nodes
patch by jbellis; reviewed by Stu Hood for CASSANDRA-568

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java   (with props)
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=887463&r1=887462&r2=887463&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Sat Dec 
5 00:22:31 2009
@@ -397,6 +397,14 @@
                : DatabaseDescriptor.getSubComparator(table, columnFamilyName);
     }
 
+    public static ColumnFamily resolve(ColumnFamily cf1, ColumnFamily cf2)
+    {
+        if (cf1 == null)
+            return cf2;
+        cf1.resolve(cf2);
+        return cf1;
+    }
+
     public void resolve(ColumnFamily cf)
     {
         // Row _does_ allow null CF objects :(  seems a necessary evil for efficiency

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=887463&r1=887462&r2=887463&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Sat Dec  5 00:22:31
2009
@@ -157,12 +157,11 @@
         {
             int oldSize = oldCf.size();
             int oldObjectCount = oldCf.getColumnCount();
-            oldCf.addAll(columnFamily);
+            oldCf.resolve(columnFamily);
             int newSize = oldCf.size();
             int newObjectCount = oldCf.getColumnCount();
             resolveSize(oldSize, newSize);
             resolveCount(oldObjectCount, newObjectCount);
-            oldCf.delete(columnFamily);
         }
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=887463&r1=887462&r2=887463&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Sat
Dec  5 00:22:31 2009
@@ -34,6 +34,7 @@
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.utils.Pair;
 import org.apache.thrift.TException;
 
 import flexjson.JSONSerializer;
@@ -568,24 +569,23 @@
             throw new InvalidRequestException("maxRows must be positive");
         }
 
-        Map<String, Collection<IColumn>> colMap; // keys are sorted.
+        List<Pair<String,Collection<IColumn>>> rows;
         try
         {
-            colMap = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent,
predicate, start_key, finish_key, maxRows));
-            if (colMap == null)
-                throw new RuntimeException("KeySlice list should never be null.");
+            rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent,
predicate, start_key, finish_key, maxRows), consistency_level);
+            assert rows != null;
         }
         catch (IOException e)
         {
             throw new RuntimeException(e);
         }
 
-        List<KeySlice> keySlices = new ArrayList<KeySlice>(colMap.size());
-        for (String key : colMap.keySet())
+        List<KeySlice> keySlices = new ArrayList<KeySlice>(rows.size());
+        for (Pair<String, Collection<IColumn>> row : rows)
         {
-            Collection<IColumn> dbList = colMap.get(key);
-            List<ColumnOrSuperColumn> svcList = new ArrayList<ColumnOrSuperColumn>(dbList.size());
-            for (org.apache.cassandra.db.IColumn col : dbList)
+            Collection<IColumn> columns = row.right;
+            List<ColumnOrSuperColumn> svcList = new ArrayList<ColumnOrSuperColumn>(columns.size());
+            for (org.apache.cassandra.db.IColumn col : columns)
             {
                 if (col instanceof org.apache.cassandra.db.Column)
                     svcList.add(new ColumnOrSuperColumn(new org.apache.cassandra.service.Column(col.name(),
col.value(), col.timestamp()), null));
@@ -598,7 +598,7 @@
                     svcList.add(new ColumnOrSuperColumn(null, new org.apache.cassandra.service.SuperColumn(col.name(),
subCols)));
                 }
             }
-            keySlices.add(new KeySlice(key, svcList));
+            keySlices.add(new KeySlice(row.left, svcList));
         }
 
         return keySlices;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=887463&r1=887462&r2=887463&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Sat
Dec  5 00:22:31 2009
@@ -37,6 +37,7 @@
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.TimedStatsDeque;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.gms.FailureDetector;
@@ -527,20 +528,17 @@
         return rows;
     }
 
-    static Map<String, Collection<IColumn>> getRangeSlice(RangeSliceCommand rawCommand)
throws IOException, UnavailableException, TimedOutException
+    static List<Pair<String, Collection<IColumn>>> getRangeSlice(RangeSliceCommand
command, int consistency_level) throws IOException, UnavailableException, TimedOutException
     {
         long startTime = System.currentTimeMillis();
         TokenMetadata tokenMetadata = StorageService.instance().getTokenMetadata();
-        RangeSliceCommand command = rawCommand;
 
         InetAddress endPoint = StorageService.instance().findSuitableEndPoint(command.start_key);
         InetAddress startEndpoint = endPoint;
-        InetAddress wrapEndpoint = tokenMetadata.getFirstEndpoint();
 
-        TreeSet<Row> allRows = new TreeSet<Row>(rowComparator);
+        Map<String, ColumnFamily> rows = new HashMap<String, ColumnFamily>(command.max_keys);
         do
         {
-
             Message message = command.getMessage();
             if (logger.isDebugEnabled())
                 logger.debug("reading " + command + " from " + message.getMessageId() + "@"
+ endPoint);
@@ -555,44 +553,12 @@
                 throw new TimedOutException();
             }
             RangeSliceReply reply = RangeSliceReply.read(responseBody);
-            List<Row> rangeRows = new ArrayList<Row>(reply.rows);
-
-            // combine these what what has been seen so far.
-            if (rangeRows.size() > 0)
+            for (Row row : reply.rows)
             {
-                if (allRows.size() > 0)
-                {
-                    if (keyComparator.compare(rangeRows.get(rangeRows.size() - 1).key, allRows.first().key)
<= 0)
-                    {
-                        // unlikely, but possible
-                        if (rangeRows.get(rangeRows.size() - 1).equals(allRows.first().key))
-                        {
-                            rangeRows.remove(rangeRows.size() - 1);
-                        }
-                        // put all from rangeRows into allRows.
-                        allRows.addAll(rangeRows);
-                    }
-                    else if (keyComparator.compare(allRows.last().key, rangeRows.get(0).key)
<= 0)
-                    {
-                        // common case. deal with simple start/end key overlaps
-                        if (allRows.last().key.equals(rangeRows.get(0)))
-                        {
-                            allRows.remove(allRows.last().key);
-                        }
-                        allRows.addAll(rangeRows); // todo: check logic.
-                    }
-                    else
-                    {
-                        // deal with potential large overlap from scanning the first endpoint,
which contains
-                        // both the smallest and largest keys
-                        allRows.addAll(rangeRows); // todo: check logic.
-                    }
-                }
-                else
-                    allRows.addAll(rangeRows); // todo: check logic.
+                rows.put(row.key, ColumnFamily.resolve(row.cf, rows.get(row.key)));
             }
 
-            if (allRows.size() >= rawCommand.max_keys || reply.rangeCompletedLocally)
+            if (rows.size() >= command.max_keys || reply.rangeCompletedLocally)
                 break;
 
             do
@@ -600,33 +566,35 @@
                 endPoint = tokenMetadata.getSuccessor(endPoint); // TODO move this into the
Strategies & modify for RackAwareStrategy
             }
             while (!FailureDetector.instance().isAlive(endPoint));
-            int maxResults = endPoint == wrapEndpoint ? rawCommand.max_keys : rawCommand.max_keys
- allRows.size();
-            command = new RangeSliceCommand(command, maxResults);
         }
         while (!endPoint.equals(startEndpoint));
 
-        Map<String, Collection<IColumn>> results = new TreeMap<String, Collection<IColumn>>();
-        for (Row row : allRows)
+        List<Pair<String, Collection<IColumn>>> results = new ArrayList<Pair<String,
Collection<IColumn>>>(rows.size());
+        for (Map.Entry<String, ColumnFamily> entry : rows.entrySet())
         {
-            if (row.cf == null)
-                results.put(row.key, Collections.<IColumn>emptyList());
-            else
-                results.put(row.key, row.cf.getSortedColumns());
+            ColumnFamily cf = entry.getValue();
+            Collection<IColumn> columns = (cf == null) ? Collections.<IColumn>emptyList()
: cf.getSortedColumns();
+            results.add(new Pair<String, Collection<IColumn>>(entry.getKey(),
columns));
         }
+        Collections.sort(results, new Comparator<Pair<String, Collection<IColumn>>>()
+        {
+            public int compare(Pair<String, Collection<IColumn>> o1, Pair<String,
Collection<IColumn>> o2)
+            {
+                return keyComparator.compare(o1.left, o2.left);
+            }
+        });
         rangeStats.add(System.currentTimeMillis() - startTime);
         return results;
     }
 
-    static List<String> getKeyRange(RangeCommand rawCommand) throws IOException, UnavailableException,
TimedOutException
+    static List<String> getKeyRange(RangeCommand command) throws IOException, UnavailableException,
TimedOutException
     {
         long startTime = System.currentTimeMillis();
         TokenMetadata tokenMetadata = StorageService.instance().getTokenMetadata();
-        List<String> allKeys = new ArrayList<String>();
-        RangeCommand command = rawCommand;
+        Set<String> uniqueKeys = new HashSet<String>(command.maxResults);
 
         InetAddress endPoint = StorageService.instance().findSuitableEndPoint(command.startWith);
         InetAddress startEndpoint = endPoint;
-        InetAddress wrapEndpoint = tokenMetadata.getFirstEndpoint();
 
         do
         {
@@ -646,49 +614,9 @@
                 throw new TimedOutException();
             }
             RangeReply rangeReply = RangeReply.read(responseBody);
-            List<String> rangeKeys = rangeReply.keys;
+            uniqueKeys.addAll(rangeReply.keys);
 
-            // combine keys from most recent response with the others seen so far
-            if (rangeKeys.size() > 0)
-            {
-                if (allKeys.size() > 0)
-                {
-                    if (keyComparator.compare(rangeKeys.get(rangeKeys.size() - 1), allKeys.get(0))
<= 0)
-                    {
-                        // unlikely, but possible
-                        if (rangeKeys.get(rangeKeys.size() - 1).equals(allKeys.get(0)))
-                        {
-                            rangeKeys.remove(rangeKeys.size() - 1);
-                        }
-                        rangeKeys.addAll(allKeys);
-                        allKeys = rangeKeys;
-                    }
-                    else if (keyComparator.compare(allKeys.get(allKeys.size() - 1), rangeKeys.get(0))
<= 0)
-                    {
-                        // common case. deal with simple start/end key overlaps
-                        if (allKeys.get(allKeys.size() - 1).equals(rangeKeys.get(0)))
-                        {
-                            allKeys.remove(allKeys.size() - 1);
-                        }
-                        allKeys.addAll(rangeKeys);
-                    }
-                    else
-                    {
-                        // deal with potential large overlap from scanning the first endpoint,
which contains
-                        // both the smallest and largest keys
-                        HashSet<String> keys = new HashSet<String>(allKeys);
-                        keys.addAll(rangeKeys);
-                        allKeys = new ArrayList<String>(keys);
-                        Collections.sort(allKeys);
-                    }
-                }
-                else
-                {
-                    allKeys = rangeKeys;
-                }
-            }
-
-            if (allKeys.size() >= rawCommand.maxResults || rangeReply.rangeCompletedLocally)
+            if (uniqueKeys.size() >= command.maxResults || rangeReply.rangeCompletedLocally)
             {
                 break;
             }
@@ -700,15 +628,15 @@
             // so starting with the largest in our scan of the next node means we'd never
see keys from the middle.
             do
             {
-                endPoint = tokenMetadata.getSuccessor(endPoint); // TODO move this into the
Strategies & modify for RackAwareStrategy
+                endPoint = tokenMetadata.getSuccessor(endPoint);
             } while (!FailureDetector.instance().isAlive(endPoint));
-            int maxResults = endPoint.equals(wrapEndpoint) ? rawCommand.maxResults : rawCommand.maxResults
- allKeys.size();
-            command = new RangeCommand(command.table, command.columnFamily, command.startWith,
command.stopAt, maxResults);
         } while (!endPoint.equals(startEndpoint));
 
         rangeStats.add(System.currentTimeMillis() - startTime);
-        return (allKeys.size() > rawCommand.maxResults)
-               ? allKeys.subList(0, rawCommand.maxResults)
+        List<String> allKeys = new ArrayList<String>(uniqueKeys);
+        Collections.sort(allKeys, keyComparator);
+        return (allKeys.size() > command.maxResults)
+               ? allKeys.subList(0, command.maxResults)
                : allKeys;
     }
 

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java?rev=887463&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java Sat Dec  5 00:22:31
2009
@@ -0,0 +1,34 @@
+package org.apache.cassandra.utils;
+
+public class Pair<T1, T2>
+{
+    public final T1 left;
+    public final T2 right;
+
+    public Pair(T1 left, T2 right)
+    {
+        this.left = left;
+        this.right = right;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        throw new UnsupportedOperationException("todo");
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        throw new UnsupportedOperationException("todo");
+    }
+
+    @Override
+    public String toString()
+    {
+        return "Pair(" +
+               "left=" + left +
+               ", right=" + right +
+               ')';
+    }
+}

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/Pair.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message