cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [1/2] git commit: Move BatchlogEndpointSelector to BatchlogManager.EndpointFilter + minor refactorings
Date Sun, 04 May 2014 00:34:59 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk aca945d79 -> f7df2fec8


Move BatchlogEndpointSelector to BatchlogManager.EndpointFilter + minor refactorings


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f486193a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f486193a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f486193a

Branch: refs/heads/trunk
Commit: f486193ab5654881986a7f7b655fee3aedabad15
Parents: f12a8a3
Author: Aleksey Yeschenko <aleksey@apache.org>
Authored: Sun May 4 03:33:03 2014 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Sun May 4 03:33:03 2014 +0300

----------------------------------------------------------------------
 .../apache/cassandra/db/BatchlogManager.java    |  80 ++++++++++++-
 .../service/BatchlogEndpointSelector.java       | 110 -----------------
 .../apache/cassandra/service/StorageProxy.java  |   7 +-
 .../service/BatchlogEndpointFilterTest.java     | 117 +++++++++++++++++++
 .../service/BatchlogEndpointSelectorTest.java   | 116 ------------------
 5 files changed, 197 insertions(+), 233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f486193a/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index fee686b..de51c8d 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -30,7 +30,7 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
+import com.google.common.collect.*;
 import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -364,4 +364,82 @@ public class BatchlogManager implements BatchlogManagerMBean
     {
         return QueryProcessor.processInternal(String.format(format, args));
     }
+
+    public static class EndpointFilter
+    {
+        private final String localRack;
+        private final Multimap<String, InetAddress> endpoints;
+
+        public EndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
+        {
+            this.localRack = localRack;
+            this.endpoints = endpoints;
+        }
+
+        /**
+         * @return list of candidates for batchlog hosting. If possible these will be two
nodes from different racks.
+         */
+        public Collection<InetAddress> filter()
+        {
+            // special case for single-node data centers
+            if (endpoints.values().size() == 1)
+                return endpoints.values();
+
+            // strip out dead endpoints and localhost
+            ListMultimap<String, InetAddress> validated = ArrayListMultimap.create();
+            for (Map.Entry<String, InetAddress> entry : endpoints.entries())
+                if (isValid(entry.getValue()))
+                    validated.put(entry.getKey(), entry.getValue());
+
+            if (validated.size() <= 2)
+                return validated.values();
+
+            if (validated.size() - validated.get(localRack).size() >= 2)
+            {
+                // we have enough endpoints in other racks
+                validated.removeAll(localRack);
+            }
+
+            if (validated.keySet().size() == 1)
+            {
+                // we have only 1 `other` rack
+                Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values());
+                return Lists.newArrayList(Iterables.limit(otherRack, 2));
+            }
+
+            // randomize which racks we pick from if more than 2 remaining
+            Collection<String> racks;
+            if (validated.keySet().size() == 2)
+            {
+                racks = validated.keySet();
+            }
+            else
+            {
+                racks = Lists.newArrayList(validated.keySet());
+                Collections.shuffle((List) racks);
+            }
+
+            // grab a random member of up to two racks
+            List<InetAddress> result = new ArrayList<>(2);
+            for (String rack : Iterables.limit(racks, 2))
+            {
+                List<InetAddress> rackMembers = validated.get(rack);
+                result.add(rackMembers.get(getRandomInt(rackMembers.size())));
+            }
+
+            return result;
+        }
+
+        @VisibleForTesting
+        protected boolean isValid(InetAddress input)
+        {
+            return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input);
+        }
+
+        @VisibleForTesting
+        protected int getRandomInt(int bound)
+        {
+            return FBUtilities.threadLocalRandom().nextInt(bound);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f486193a/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java b/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java
deleted file mode 100644
index bf032f5..0000000
--- a/src/java/org/apache/cassandra/service/BatchlogEndpointSelector.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.utils.FBUtilities;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-
-public class BatchlogEndpointSelector
-{
-    private final String localRack;
-    
-    public BatchlogEndpointSelector(String localRack)
-    {
-        this.localRack = localRack;
-    }
-
-    /**
-     * @param endpoints nodes in the local datacenter, grouped by rack name
-     * @return list of candidates for batchlog hosting.  if possible these will be two nodes
from different racks.
-     */
-    public Collection<InetAddress> chooseEndpoints(Multimap<String, InetAddress>
endpoints)
-    {
-        // strip out dead endpoints and localhost
-        ListMultimap<String, InetAddress> validated = ArrayListMultimap.create();
-        for (Map.Entry<String, InetAddress> entry : endpoints.entries())
-        {
-            if (isValid(entry.getValue()))
-                validated.put(entry.getKey(), entry.getValue());
-        }
-        if (validated.size() <= 2)
-            return validated.values();
-
-        if ((validated.size() - validated.get(localRack).size()) >= 2)
-        {
-            // we have enough endpoints in other racks
-            validated.removeAll(localRack);
-        }
-
-        if (validated.keySet().size() == 1)
-        {
-            // we have only 1 `other` rack
-            Collection<InetAddress> otherRack = Iterables.getOnlyElement(validated.asMap().values());
-            return Lists.newArrayList(Iterables.limit(otherRack, 2));
-        }
-
-        // randomize which racks we pick from if more than 2 remaining
-        Collection<String> racks;
-        if (validated.keySet().size() == 2)
-        {
-            racks = validated.keySet();
-        }
-        else
-        {
-            racks = Lists.newArrayList(validated.keySet());
-            Collections.shuffle((List) racks);
-        }
-
-        // grab a random member of up to two racks
-        List<InetAddress> result = new ArrayList<>(2);
-        for (String rack : Iterables.limit(racks, 2))
-        {
-            List<InetAddress> rackMembers = validated.get(rack);
-            result.add(rackMembers.get(getRandomInt(rackMembers.size())));
-        }
-
-        return result;
-    }
-    
-    @VisibleForTesting
-    protected boolean isValid(InetAddress input)
-    {
-        return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input);
-    }
-    
-    @VisibleForTesting
-    protected int getRandomInt(int bound)
-    {
-        return FBUtilities.threadLocalRandom().nextInt(bound);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f486193a/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 04c3641..d01dd99 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -741,14 +741,9 @@ public class StorageProxy implements StorageProxyMBean
     {
         TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
         Multimap<String, InetAddress> localEndpoints = HashMultimap.create(topology.getDatacenterRacks().get(localDataCenter));
-        
-        // special case for single-node datacenters
-        if (localEndpoints.size() == 1)
-            return localEndpoints.values();
-
         String localRack = DatabaseDescriptor.getEndpointSnitch().getRack(FBUtilities.getBroadcastAddress());
-        Collection<InetAddress> chosenEndpoints = new BatchlogEndpointSelector(localRack).chooseEndpoints(localEndpoints);
 
+        Collection<InetAddress> chosenEndpoints = new BatchlogManager.EndpointFilter(localRack,
localEndpoints).filter();
         if (chosenEndpoints.isEmpty())
         {
             if (consistencyLevel == ConsistencyLevel.ANY)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f486193a/test/unit/org/apache/cassandra/service/BatchlogEndpointFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/BatchlogEndpointFilterTest.java b/test/unit/org/apache/cassandra/service/BatchlogEndpointFilterTest.java
new file mode 100644
index 0000000..72e8df5
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/BatchlogEndpointFilterTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import org.junit.Test;
+import org.junit.matchers.JUnitMatchers;
+
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+
+import org.apache.cassandra.db.BatchlogManager;
+
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.CoreMatchers.is;
+
+public class BatchlogEndpointFilterTest
+{
+    private static final String LOCAL = "local";
+
+    @Test
+    public void shouldSelect2hostsFromNonLocalRacks() throws UnknownHostException
+    {
+        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress>
builder()
+                .put(LOCAL, InetAddress.getByName("0"))
+                .put(LOCAL, InetAddress.getByName("00"))
+                .put("1", InetAddress.getByName("1"))
+                .put("1", InetAddress.getByName("11"))
+                .put("2", InetAddress.getByName("2"))
+                .put("2", InetAddress.getByName("22"))
+                .build();
+        Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+        assertThat(result.size(), is(2));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11")));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("22")));
+    }
+
+    @Test
+    public void shouldSelectHostFromLocal() throws UnknownHostException
+    {
+        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress>
builder()
+                .put(LOCAL, InetAddress.getByName("0"))
+                .put(LOCAL, InetAddress.getByName("00"))
+                .put("1", InetAddress.getByName("1"))
+                .build();
+        Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+        assertThat(result.size(), is(2));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1")));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
+    }
+
+    @Test
+    public void shouldReturnAsIsIfNoEnoughEndpoints() throws UnknownHostException
+    {
+        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress>
builder()
+                .put(LOCAL, InetAddress.getByName("0"))
+                .build();
+        Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+        assertThat(result.size(), is(1));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
+    }
+
+    @Test
+    public void shouldSelectTwoFirstHostsFromSingleOtherRack() throws UnknownHostException
+    {
+        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress>
builder()
+                .put(LOCAL, InetAddress.getByName("0"))
+                .put(LOCAL, InetAddress.getByName("00"))
+                .put("1", InetAddress.getByName("1"))
+                .put("1", InetAddress.getByName("11"))
+                .put("1", InetAddress.getByName("111"))
+                .build();
+        Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+        assertThat(result.size(), is(2));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1")));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11")));
+    }
+
+    private static class TestEndpointFilter extends BatchlogManager.EndpointFilter
+    {
+        public TestEndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
+        {
+            super(localRack, endpoints);
+        }
+
+        @Override
+        protected boolean isValid(InetAddress input)
+        {
+            // We will use always alive non-localhost endpoints
+            return true;
+        }
+
+        @Override
+        protected int getRandomInt(int bound)
+        {
+            // We don't need random behavior here
+            return bound - 1;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f486193a/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java b/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java
deleted file mode 100644
index 293078d..0000000
--- a/test/unit/org/apache/cassandra/service/BatchlogEndpointSelectorTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Collection;
-
-import org.junit.Test;
-import org.junit.matchers.JUnitMatchers;
-
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.Multimap;
-
-public class BatchlogEndpointSelectorTest
-{
-    private final BatchlogEndpointSelector target;
-    private static final String LOCAL = "local";
-    
-
-    public BatchlogEndpointSelectorTest() throws UnknownHostException
-    {
-        target = new BatchlogEndpointSelector(LOCAL)
-        {
-            @Override
-            protected boolean isValid(InetAddress input)
-            {   
-                //we will use always alive non-localhost endpoints
-                return true;
-            }
-
-            @Override
-            protected int getRandomInt(int bound)
-            {
-                //we don't need a random behavior here
-                return bound - 1;
-            }
-        };
-    }
-    
-    @Test
-    public void shouldSelect2hostsFromNonLocalRacks() throws UnknownHostException
-    {
-        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress>
builder()
-                .put(LOCAL, InetAddress.getByName("0"))
-                .put(LOCAL, InetAddress.getByName("00"))
-                .put("1", InetAddress.getByName("1"))
-                .put("1", InetAddress.getByName("11"))
-                .put("2", InetAddress.getByName("2"))
-                .put("2", InetAddress.getByName("22"))
-                .build();
-        Collection<InetAddress> result = target.chooseEndpoints(endpoints);
-        assertThat(result.size(), is(2));
-        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11")));
-        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("22")));
-    }
-    
-    @Test
-    public void shouldSelectHostFromLocal() throws UnknownHostException
-    {
-        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress>
builder()
-                .put(LOCAL, InetAddress.getByName("0"))
-                .put(LOCAL, InetAddress.getByName("00"))
-                .put("1", InetAddress.getByName("1"))
-                .build();
-        Collection<InetAddress> result = target.chooseEndpoints(endpoints);
-        assertThat(result.size(), is(2));
-        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1")));
-        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
-    }
-    
-    @Test
-    public void shouldReturnAsIsIfNoEnoughEndpoints() throws UnknownHostException
-    {
-        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress>
builder()
-                .put(LOCAL, InetAddress.getByName("0"))
-                .build();
-        Collection<InetAddress> result = target.chooseEndpoints(endpoints);
-        assertThat(result.size(), is(1));
-        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
-    }
-    
-    @Test
-    public void shouldSelectTwoFirstHostsFromSingleOtherRack() throws UnknownHostException
-    {
-        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress>
builder()
-                .put(LOCAL, InetAddress.getByName("0"))
-                .put(LOCAL, InetAddress.getByName("00"))
-                .put("1", InetAddress.getByName("1"))
-                .put("1", InetAddress.getByName("11"))
-                .put("1", InetAddress.getByName("111"))
-                .build();
-        Collection<InetAddress> result = target.chooseEndpoints(endpoints);
-        assertThat(result.size(), is(2));
-        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1")));
-        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11")));
-    }
-}


Mime
View raw message