cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject git commit: (Hadoop) fix cluster initialisation for a split fetching
Date Wed, 20 Aug 2014 17:40:24 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 44cfd958a -> fe39eb7a9


(Hadoop) fix cluster initialisation for a split fetching

patch by Jacek Lewandowski; reviewed by Alex Liu for CASSANDRA-7774


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fe39eb7a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fe39eb7a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fe39eb7a

Branch: refs/heads/cassandra-2.0
Commit: fe39eb7a9e2b017e3cd31b1c09693c8d565dee18
Parents: 44cfd95
Author: Jacek Lewandowski <lewandowski.jacek@gmail.com>
Authored: Wed Aug 20 20:39:12 2014 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Wed Aug 20 20:39:12 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  |  89 +--------
 .../cassandra/hadoop/cql3/CqlRecordReader.java  |  19 +-
 ...mitedLocalNodeFirstLocalBalancingPolicy.java | 185 +++++++++++++++++++
 4 files changed, 198 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe39eb7a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 17c0671..71cfca0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.10
+ * (Hadoop) fix cluster initialisation for a split fetching (CASSANDRA-7774)
  * Configure system.paxos with LeveledCompactionStrategy (CASSANDRA-7753)
  * Fix ALTER clustering column type from DateType to TimestampType when
    using DESC clustering order (CASSANRDA-7797)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe39eb7a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index e894996..137bddf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -288,16 +288,22 @@ public class CqlConfigHelper
 
     public static Cluster getInputCluster(String host, Configuration conf)
     {
+        // this method has been left for backward compatibility
+        return getInputCluster(new String[] {host}, conf);
+    }
+
+    public static Cluster getInputCluster(String[] hosts, Configuration conf)
+    {
         int port = getInputNativePort(conf);
         Optional<AuthProvider> authProvider = getAuthProvider(conf);
         Optional<SSLOptions> sslOptions = getSSLOptions(conf);
-        LoadBalancingPolicy loadBalancingPolicy = getReadLoadBalancingPolicy(conf, host);
+        LoadBalancingPolicy loadBalancingPolicy = getReadLoadBalancingPolicy(conf, hosts);
         SocketOptions socketOptions = getReadSocketOptions(conf);
         QueryOptions queryOptions = getReadQueryOptions(conf);
         PoolingOptions poolingOptions = getReadPoolingOptions(conf);
         
         Cluster.Builder builder = Cluster.builder()
-                                         .addContactPoint(host)
+                                         .addContactPoints(hosts)
                                          .withPort(port)
                                          .withCompression(ProtocolOptions.Compression.NONE);
 
@@ -480,84 +486,9 @@ public class CqlConfigHelper
         return socketOptions;
     }
 
-    private static LoadBalancingPolicy getReadLoadBalancingPolicy(Configuration conf, final
String stickHost)
+    private static LoadBalancingPolicy getReadLoadBalancingPolicy(Configuration conf, final
String[] stickHosts)
     {
-        return new LoadBalancingPolicy()
-        {
-            private Host origHost;
-            private Set<Host> liveRemoteHosts = Sets.newHashSet();
-
-            @Override
-            public void onAdd(Host host)
-            {
-                if (host.getAddress().getHostName().equals(stickHost))
-                    origHost = host;
-            }
-
-            @Override
-            public void onDown(Host host)
-            {
-                if (host.getAddress().getHostName().equals(stickHost))
-                    origHost = null;
-                liveRemoteHosts.remove(host);
-            }
-
-            @Override
-            public void onRemove(Host host)
-            {
-                if (host.getAddress().getHostName().equals(stickHost))
-                    origHost = null;
-                liveRemoteHosts.remove(host);
-            }
-
-            @Override
-            public void onUp(Host host)
-            {
-                if (host.getAddress().getHostName().equals(stickHost))
-                    origHost = host;
-                liveRemoteHosts.add(host);
-            }
-
-            @Override
-            public void onSuspected(Host host)
-            {
-            }
-
-            @Override
-            public HostDistance distance(Host host)
-            {
-                if (host.getAddress().getHostName().equals(stickHost))
-                    return HostDistance.LOCAL;
-                else
-                    return HostDistance.REMOTE;
-            }
-
-            @Override
-            public void init(Cluster cluster, Collection<Host> hosts)
-            {
-                for (Host host : hosts)
-                {
-                    if (host.getAddress().getHostName().equals(stickHost))
-                    {
-                        origHost = host;
-                        break;
-                    }
-                }
-            }
-
-            @Override
-            public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement)
-            {
-                if (origHost != null)
-                {
-                    return Iterators.concat(Collections.singletonList(origHost).iterator(),
liveRemoteHosts.iterator());
-                }
-                else
-                {
-                    return liveRemoteHosts.iterator();
-                }
-            }
-        };
+        return new LimitedLocalNodeFirstLocalBalancingPolicy(stickHosts);
     }
 
     private static Optional<AuthProvider> getAuthProvider(Configuration conf)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe39eb7a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 9167ac3..3eab7c0 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -124,24 +124,9 @@ public class CqlRecordReader extends RecordReader<Long, Row>
             if (cluster != null)
                 return;
 
-            // create connection using thrift
+            // create a Cluster instance
             String[] locations = split.getLocations();
-            Exception lastException = null;
-            for (String location : locations)
-            {
-                try
-                {
-                    cluster = CqlConfigHelper.getInputCluster(location, conf);
-                    break;
-                }
-                catch (Exception e)
-                {
-                    lastException = e;
-                    logger.warn("Failed to create authenticated client to {}", location);
-                }
-            }
-            if (cluster == null && lastException != null)
-                throw lastException;
+            cluster = CqlConfigHelper.getInputCluster(locations, conf);
         }
         catch (Exception e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe39eb7a/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java
b/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java
new file mode 100644
index 0000000..3aa7df0
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/cql3/LimitedLocalNodeFirstLocalBalancingPolicy.java
@@ -0,0 +1,185 @@
+package org.apache.cassandra.hadoop.cql3;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * This load balancing policy is intended to be used only for CqlRecordReader when it fetches
a particular split.
+ * <p/>
+ * It chooses alive hosts only from the set of the given replicas - because the connection
is used to load the data from
+ * the particular split, with a strictly defined list of the replicas, it is pointless to
try the other nodes.
+ * The policy tracks which of the replicas are alive, and when a new query plan is requested,
it returns those replicas
+ * in the following order:
+ * <ul>
+ * <li>the local node</li>
+ * <li>the collection of the remaining hosts (which is shuffled on each request)</li>
+ * </ul>
+ */
+class LimitedLocalNodeFirstLocalBalancingPolicy implements LoadBalancingPolicy
+{
+    private final static Logger logger = LoggerFactory.getLogger(LimitedLocalNodeFirstLocalBalancingPolicy.class);
+
+    private final static Set<InetAddress> localAddresses = Collections.unmodifiableSet(getLocalInetAddresses());
+
+    private final CopyOnWriteArraySet<Host> liveReplicaHosts = new CopyOnWriteArraySet<>();
+
+    private final Set<InetAddress> replicaAddresses = new HashSet<>();
+
+    public LimitedLocalNodeFirstLocalBalancingPolicy(String[] replicas)
+    {
+        for (String replica : replicas)
+        {
+            try
+            {
+                InetAddress[] addresses = InetAddress.getAllByName(replica);
+                Collections.addAll(replicaAddresses, addresses);
+            }
+            catch (UnknownHostException e)
+            {
+                logger.warn("Invalid replica host name: {}, skipping it", replica);
+            }
+        }
+        logger.debug("Created instance with the following replicas: {}", Arrays.asList(replicas));
+    }
+
+    @Override
+    public void init(Cluster cluster, Collection<Host> hosts)
+    {
+        List<Host> replicaHosts = new ArrayList<>();
+        for (Host host : hosts)
+        {
+            if (replicaAddresses.contains(host.getAddress()))
+            {
+                replicaHosts.add(host);
+            }
+        }
+        liveReplicaHosts.addAll(replicaHosts);
+        logger.debug("Initialized with replica hosts: {}", replicaHosts);
+    }
+
+    @Override
+    public HostDistance distance(Host host)
+    {
+        if (isLocalHost(host))
+        {
+            return HostDistance.LOCAL;
+        }
+        else
+        {
+            return HostDistance.REMOTE;
+        }
+    }
+
+    @Override
+    public Iterator<Host> newQueryPlan(String keyspace, Statement statement)
+    {
+        List<Host> local = new ArrayList<>(1);
+        List<Host> remote = new ArrayList<>(liveReplicaHosts.size());
+        for (Host liveReplicaHost : liveReplicaHosts)
+        {
+            if (isLocalHost(liveReplicaHost))
+            {
+                local.add(liveReplicaHost);
+            }
+            else
+            {
+                remote.add(liveReplicaHost);
+            }
+        }
+
+        Collections.shuffle(remote);
+
+        logger.debug("Using the following hosts order for the new query plan: {} | {}", local,
remote);
+
+        return Iterators.concat(local.iterator(), remote.iterator());
+    }
+
+    @Override
+    public void onAdd(Host host)
+    {
+        if (replicaAddresses.contains(host.getAddress()))
+        {
+            liveReplicaHosts.add(host);
+            logger.debug("Added a new host {}", host);
+        }
+    }
+
+    @Override
+    public void onUp(Host host)
+    {
+        if (replicaAddresses.contains(host.getAddress()))
+        {
+            liveReplicaHosts.add(host);
+            logger.debug("The host {} is now up", host);
+        }
+    }
+
+    @Override
+    public void onDown(Host host)
+    {
+        if (liveReplicaHosts.remove(host))
+        {
+            logger.debug("The host {} is now down", host);
+        }
+    }
+
+
+    @Override
+    public void onRemove(Host host)
+    {
+        if (liveReplicaHosts.remove(host))
+        {
+            logger.debug("Removed the host {}", host);
+        }
+    }
+
+    @Override
+    public void onSuspected(Host host)
+    {
+        // not supported by this load balancing policy
+    }
+
+    private static boolean isLocalHost(Host host)
+    {
+        InetAddress hostAddress = host.getAddress();
+        return hostAddress.isLoopbackAddress() || localAddresses.contains(hostAddress);
+    }
+
+    private static Set<InetAddress> getLocalInetAddresses()
+    {
+        try
+        {
+            return Sets.newHashSet(Iterators.concat(
+                    Iterators.transform(
+                            Iterators.forEnumeration(NetworkInterface.getNetworkInterfaces()),
+                            new Function<NetworkInterface, Iterator<InetAddress>>()
+                            {
+                                @Override
+                                public Iterator<InetAddress> apply(NetworkInterface
netIface)
+                                {
+                                    return Iterators.forEnumeration(netIface.getInetAddresses());
+                                }
+                            })));
+        }
+        catch (SocketException e)
+        {
+            logger.warn("Could not retrieve local network interfaces.", e);
+            return Collections.emptySet();
+        }
+    }
+}


Mime
View raw message