cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r832266 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/dht/
Date Tue, 03 Nov 2009 02:23:54 GMT
Author: jbellis
Date: Tue Nov  3 02:23:53 2009
New Revision: 832266

URL: http://svn.apache.org/viewvc?rev=832266&view=rev
Log:
CASSANDRA-522
convert replication strategy methods to multimap
patch by jbellis for CASSANDRA-522

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=832266&r1=832265&r2=832266&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue
Nov  3 02:23:53 2009
@@ -289,7 +289,7 @@
      * This method forces a compaction of the SSTables on disk. We wait
      * for the process to complete by waiting on a future pointer.
     */
-    List<SSTableReader> forceAntiCompaction(List<Range> ranges, InetAddress target)
+    List<SSTableReader> forceAntiCompaction(Collection<Range> ranges, InetAddress
target)
     {
         assert ranges != null;
         Future<List<SSTableReader>> futurePtr = CompactionManager.instance().submit(ColumnFamilyStore.this,
ranges, target);
@@ -717,7 +717,7 @@
         return maxFile;
     }
 
-    List<SSTableReader> doAntiCompaction(List<Range> ranges, InetAddress target)
throws IOException
+    List<SSTableReader> doAntiCompaction(Collection<Range> ranges, InetAddress
target) throws IOException
     {
         return doFileAntiCompaction(ssTables_.getSSTables(), ranges, target);
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=832266&r1=832265&r2=832266&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Tue
Nov  3 02:23:53 2009
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.List;
+import java.util.Collection;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -74,10 +75,10 @@
     static class FileCompactor2 implements Callable<List<SSTableReader>>
     {
         private ColumnFamilyStore columnFamilyStore_;
-        private List<Range> ranges_;
+        private Collection<Range> ranges_;
         private InetAddress target_;
 
-        FileCompactor2(ColumnFamilyStore columnFamilyStore, List<Range> ranges, InetAddress
target)
+        FileCompactor2(ColumnFamilyStore columnFamilyStore, Collection<Range> ranges,
InetAddress target)
         {
             columnFamilyStore_ = columnFamilyStore;
             ranges_ = ranges;
@@ -187,7 +188,7 @@
         compactor_.submit(new CleanupCompactor(columnFamilyStore));
     }
 
-    public Future<List<SSTableReader>> submit(ColumnFamilyStore columnFamilyStore,
List<Range> ranges, InetAddress target)
+    public Future<List<SSTableReader>> submit(ColumnFamilyStore columnFamilyStore,
Collection<Range> ranges, InetAddress target)
     {
         return compactor_.submit( new FileCompactor2(columnFamilyStore, ranges, target) );
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=832266&r1=832265&r2=832266&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Tue Nov  3 02:23:53
2009
@@ -301,7 +301,7 @@
      * do a complete compaction since we can figure out based on the ranges
      * whether the files need to be split.
     */
-    public List<SSTableReader> forceAntiCompaction(List<Range> ranges, InetAddress
target)
+    public List<SSTableReader> forceAntiCompaction(Collection<Range> ranges,
InetAddress target)
     {
         List<SSTableReader> allResults = new ArrayList<SSTableReader>();
         Set<String> columnFamilies = tableMetadata_.getColumnFamilies();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=832266&r1=832265&r2=832266&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Tue Nov
 3 02:23:53 2009
@@ -18,13 +18,7 @@
 
 package org.apache.cassandra.dht;
 
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
+ import java.util.*;
  import java.util.concurrent.locks.Condition;
  import java.io.IOException;
  import java.io.UnsupportedEncodingException;
@@ -54,6 +48,9 @@
  import org.apache.cassandra.io.SSTableWriter;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Table;
+ import com.google.common.collect.Multimap;
+ import com.google.common.collect.HashMultimap;
+ import com.google.common.collect.ArrayListMultimap;
 
 
  /**
@@ -95,11 +92,11 @@
         {
             public void run()
             {
-                Map<Range, Set<InetAddress>> rangesWithSourceTarget = getRangesWithSources();
+                Multimap<Range, InetAddress> rangesWithSourceTarget = getRangesWithSources();
                 if (logger.isDebugEnabled())
                         logger.debug("Beginning bootstrap process for " + address + " ...");
                 /* Send messages to respective folks to stream data over to me */
-                for (Map.Entry<InetAddress, List<Range>> entry : getWorkMap(rangesWithSourceTarget).entrySet())
+                for (Map.Entry<InetAddress, Collection<Range>> entry : getWorkMap(rangesWithSourceTarget).asMap().entrySet())
                 {
                     InetAddress source = entry.getKey();
                     BootstrapMetadata bsMetadata = new BootstrapMetadata(address, entry.getValue());
@@ -147,24 +144,24 @@
         }
     }
 
-    Map<Range, Set<InetAddress>> getRangesWithSources()
+    Multimap<Range, InetAddress> getRangesWithSources()
     {
         Map<Token, InetAddress> map = tokenMetadata.cloneTokenEndPointMap();
         assert map.size() > 0;
         map.put(token, address);
-        Set<Range> myRanges = replicationStrategy.getAddressRanges(map).get(address);
+        Collection<Range> myRanges = replicationStrategy.getAddressRanges(map).get(address);
         map.remove(token);
 
-        Map<Range, Set<InetAddress>> myRangeAddresses = new HashMap<Range,
Set<InetAddress>>();
-        Map<Range, Set<InetAddress>> rangeAddresses = replicationStrategy.getRangeAddresses(map);
+        Multimap<Range, InetAddress> myRangeAddresses = HashMultimap.create();
+        Multimap<Range, InetAddress> rangeAddresses = replicationStrategy.getRangeAddresses(map);
         for (Range range : rangeAddresses.keySet())
         {
             for (Range myRange : myRanges)
             {
                 if (range.contains(myRange.right()))
                 {
-                    assert !myRangeAddresses.containsKey(myRange);
-                    myRangeAddresses.put(myRange, rangeAddresses.get(range));
+                    myRangeAddresses.putAll(myRange, rangeAddresses.get(range));
+                    break;
                 }
             }
         }
@@ -179,18 +176,18 @@
         return btc.getToken();
     }
 
-    static Map<InetAddress, List<Range>> getWorkMap(Map<Range, Set<InetAddress>>
rangesWithSourceTarget)
+    static Multimap<InetAddress, Range> getWorkMap(Multimap<Range, InetAddress>
rangesWithSourceTarget)
     {
         return getWorkMap(rangesWithSourceTarget, FailureDetector.instance());
     }
 
-    static Map<InetAddress, List<Range>> getWorkMap(Map<Range, Set<InetAddress>>
rangesWithSourceTarget, IFailureDetector failureDetector)
+    static Multimap<InetAddress, Range> getWorkMap(Multimap<Range, InetAddress>
rangesWithSourceTarget, IFailureDetector failureDetector)
     {
         /*
          * Map whose key is the source node and the value is a map whose key is the
          * target and value is the list of ranges to be sent to it.
         */
-        Map<InetAddress, List<Range>> sources = new HashMap<InetAddress, List<Range>>();
+        Multimap<InetAddress, Range> sources = ArrayListMultimap.create();
 
         // TODO look for contiguous ranges and map them to the same source
         for (Range range : rangesWithSourceTarget.keySet())
@@ -199,13 +196,7 @@
             {
                 if (failureDetector.isAlive(source))
                 {
-                    List<Range> ranges = sources.get(source);
-                    if (ranges == null)
-                    {
-                        ranges = new ArrayList<Range>();
-                        sources.put(source, ranges);
-                    }
-                    ranges.add(range);
+                    sources.put(source, range);
                     break;
                 }
             }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java?rev=832266&r1=832265&r2=832266&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadata.java Tue
Nov  3 02:23:53 2009
@@ -23,6 +23,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Collection;
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.CompactEndPointSerializationHelper;
@@ -51,9 +52,9 @@
     }
     
     protected InetAddress target_;
-    protected List<Range> ranges_;
+    protected Collection<Range> ranges_;
     
-    BootstrapMetadata(InetAddress target, List<Range> ranges)
+    BootstrapMetadata(InetAddress target, Collection<Range> ranges)
     {
         target_ = target;
         ranges_ = ranges;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java?rev=832266&r1=832265&r2=832266&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
Tue Nov  3 02:23:53 2009
@@ -22,6 +22,7 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Collection;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Table;
@@ -93,7 +94,7 @@
      * locally for each range and then stream them using
      * the Bootstrap protocol to the target endpoint.
     */
-    private void doTransfer(InetAddress target, List<Range> ranges) throws IOException
+    private void doTransfer(InetAddress target, Collection<Range> ranges) throws IOException
     {
         if ( ranges.size() == 0 )
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=832266&r1=832265&r2=832266&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
Tue Nov  3 02:23:53 2009
@@ -23,8 +23,8 @@
 
 import org.apache.log4j.Logger;
 
-import org.apache.commons.lang.StringUtils;
-
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -33,7 +33,6 @@
 import org.apache.cassandra.service.InvalidRequestException;
 import org.apache.cassandra.service.QuorumResponseHandler;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.config.DatabaseDescriptor;
 
 /**
  * This class contains a helper method that will be used by
@@ -177,46 +176,39 @@
 
     // TODO this is pretty inefficient. also the inverse (getRangeAddresses) below.
     // fixing this probably requires merging tokenmetadata into replicationstrategy, so we
can cache/invalidate cleanly
-    public Map<InetAddress, Set<Range>> getAddressRanges(Map<Token, InetAddress>
tokenMap)
+    public Multimap<InetAddress, Range> getAddressRanges(Map<Token, InetAddress>
metadata)
     {
-        Map<InetAddress, Set<Range>> map = new HashMap<InetAddress, Set<Range>>();
-
-        for (InetAddress ep : tokenMap.values())
-        {
-            map.put(ep, new HashSet<Range>());
-        }
+        Multimap<InetAddress, Range> map = HashMultimap.create();
 
-        for (Token token : tokenMap.keySet())
+        for (Token token : metadata.keySet())
         {
-            Range range = getPrimaryRangeFor(token, tokenMap);
-            for (InetAddress ep : getNaturalEndpoints(token, tokenMap))
+            Range range = getPrimaryRangeFor(token, metadata);
+            for (InetAddress ep : getNaturalEndpoints(token, metadata))
             {
-                map.get(ep).add(range);
+                map.put(ep, range);
             }
         }
 
         return map;
     }
 
-    public Map<Range, Set<InetAddress>> getRangeAddresses(Map<Token, InetAddress>
tokenMap)
+    public Multimap<Range, InetAddress> getRangeAddresses(Map<Token, InetAddress>
metadata)
     {
-        Map<Range, Set<InetAddress>> map = new HashMap<Range, Set<InetAddress>>();
+        Multimap<Range, InetAddress> map = HashMultimap.create();
 
-        for (Token token : tokenMap.keySet())
+        for (Token token : metadata.keySet())
         {
-            Range range = getPrimaryRangeFor(token, tokenMap);
-            HashSet<InetAddress> addresses = new HashSet<InetAddress>();
-            for (InetAddress ep : getNaturalEndpoints(token, tokenMap))
+            Range range = getPrimaryRangeFor(token, metadata);
+            for (InetAddress ep : getNaturalEndpoints(token, metadata))
             {
-                addresses.add(ep);
+                map.put(range, ep);
             }
-            map.put(range, addresses);
         }
 
         return map;
     }
 
-    public Map<InetAddress, Set<Range>> getAddressRanges()
+    public Multimap<InetAddress, Range> getAddressRanges()
     {
         return getAddressRanges(tokenMetadata_.cloneTokenEndPointMap());
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=832266&r1=832265&r2=832266&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue
Nov  3 02:23:53 2009
@@ -84,7 +84,7 @@
         return partitioner_;
     }
 
-    public Set<Range> getLocalRanges()
+    public Collection<Range> getLocalRanges()
     {
         return getRangesForEndPoint(FBUtilities.getLocalAddress());
     }
@@ -672,7 +672,7 @@
      * @param ep endpoint we are interested in.
      * @return ranges for the specified endpoint.
      */
-    Set<Range> getRangesForEndPoint(InetAddress ep)
+    Collection<Range> getRangesForEndPoint(InetAddress ep)
     {
         return replicationStrategy_.getAddressRanges().get(ep);
     }

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=832266&r1=832265&r2=832266&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java Tue
Nov  3 02:23:53 2009
@@ -23,6 +23,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Collection;
 
 import org.apache.commons.lang.StringUtils;
 import static org.junit.Assert.assertEquals;
@@ -32,6 +33,7 @@
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import com.google.common.collect.Multimap;
 
 public class BootStrapperTest {
     @Test
@@ -54,10 +56,10 @@
         TokenMetadata tmd = ss.getTokenMetadata();
         assertEquals(numOldNodes, tmd.cloneTokenEndPointMap().size());
         BootStrapper b = new BootStrapper(ss.getReplicationStrategy(), myEndpoint, myToken,
tmd);
-        Map<Range, Set<InetAddress>> res = b.getRangesWithSources();
+        Multimap<Range, InetAddress> res = b.getRangesWithSources();
         
         int transferCount = 0;
-        for (Map.Entry<Range, Set<InetAddress>> e : res.entrySet())
+        for (Map.Entry<Range, Collection<InetAddress>> e : res.asMap().entrySet())
         {
             assert e.getValue() != null && e.getValue().size() > 0 : StringUtils.join(e.getValue(),
", ");
             transferCount++;
@@ -77,9 +79,9 @@
             public void registerFailureDetectionEventListener(IFailureDetectionEventListener
listener) { throw new UnsupportedOperationException(); }
             public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener
listener) { throw new UnsupportedOperationException(); }
         };
-        Map<InetAddress, List<Range>> temp = BootStrapper.getWorkMap(res, mockFailureDetector);
+        Multimap<InetAddress, Range> temp = BootStrapper.getWorkMap(res, mockFailureDetector);
         assertEquals(1, temp.keySet().size());
-        assertEquals(1, temp.values().iterator().next().size());
+        assertEquals(1, temp.asMap().values().iterator().next().size());
         assert !temp.keySet().iterator().next().equals(myEndpoint);
     }
 



Mime
View raw message