Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A77FD10400 for ; Wed, 28 May 2014 20:27:18 +0000 (UTC) Received: (qmail 68454 invoked by uid 500); 28 May 2014 20:27:18 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 68396 invoked by uid 500); 28 May 2014 20:27:18 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 68380 invoked by uid 99); 28 May 2014 20:27:18 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 28 May 2014 20:27:18 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 20ACA4E0CC; Wed, 28 May 2014 20:27:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aleksey@apache.org To: commits@cassandra.apache.org Date: Wed, 28 May 2014 20:27:18 -0000 Message-Id: <5f254f983d664630867130782f37db68@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/5] git commit: Add authentication support to shuffle Repository: cassandra Updated Branches: refs/heads/trunk a53f5d300 -> d079c76d2 Add authentication support to shuffle patch by Aleksey Yeschenko; reviewed by Brandon Williams for CASSANDRA-6484 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/35efdc1e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/35efdc1e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/35efdc1e Branch: refs/heads/trunk Commit: 35efdc1e391bf6f7d9ab1b27bd428902478a7923 Parents: 999bee7 Author: Aleksey Yeschenko Authored: Wed May 28 23:19:54 2014 +0300 Committer: Aleksey Yeschenko Committed: Wed May 28 23:19:54 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../org/apache/cassandra/tools/Shuffle.java | 287 ++++++++----------- 2 files changed, 124 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/35efdc1e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 94a1f06..523dda3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -5,6 +5,8 @@ * Copy compaction options to make sure they are reloaded (CASSANDRA-7290) * Add option to do more aggressive tombstone compactions (CASSANDRA-6563) * Don't try to compact already-compacting files in HHOM (CASSANDRA-7288) + * Add authentication support to shuffle (CASSANDRA-6484) + 2.0.8 * Always reallocate buffers in HSHA (CASSANDRA-6285) @@ -65,6 +67,7 @@ Merged from 1.2: * reduce garbage creation in calculatePendingRanges (CASSANDRA-7191) * exit CQLSH with error status code if script fails (CASSANDRA-6344) + 2.0.7 * Put nodes in hibernate when join_ring is false (CASSANDRA-6961) * Avoid early loading of non-system keyspaces before compaction-leftovers http://git-wip-us.apache.org/repos/asf/cassandra/blob/35efdc1e/src/java/org/apache/cassandra/tools/Shuffle.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/Shuffle.java b/src/java/org/apache/cassandra/tools/Shuffle.java index b189ccb..c1ebafe 100644 --- a/src/java/org/apache/cassandra/tools/Shuffle.java +++ b/src/java/org/apache/cassandra/tools/Shuffle.java @@ -1,4 +1,3 @@ -package org.apache.cassandra.tools; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,14 +18,13 @@ package org.apache.cassandra.tools; * under the License. * */ - +package org.apache.cassandra.tools; import java.io.Closeable; import java.io.IOException; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; @@ -37,38 +35,28 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; - import javax.management.JMX; import javax.management.MBeanServerConnection; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.MissingArgumentException; + import org.apache.cassandra.serializers.TimestampSerializer; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.EndpointSnitchInfoMBean; import org.apache.cassandra.service.StorageServiceMBean; -import org.apache.cassandra.thrift.Cassandra; -import org.apache.cassandra.thrift.Compression; -import org.apache.cassandra.thrift.ConsistencyLevel; -import org.apache.cassandra.thrift.CqlResult; -import org.apache.cassandra.thrift.CqlRow; -import org.apache.cassandra.thrift.InvalidRequestException; -import org.apache.cassandra.thrift.TimedOutException; -import org.apache.cassandra.thrift.TokenRange; -import org.apache.cassandra.thrift.UnavailableException; +import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.MissingArgumentException; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TFastFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; -import org.apache.thrift.transport.TTransportException; - -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; public class Shuffle extends AbstractJmxClient { @@ -80,37 +68,45 @@ public class Shuffle extends AbstractJmxClient private final String thriftHost; private final int thriftPort; private final boolean thriftFramed; + private final String thriftUsername; + private final String thriftPassword; static { - addCmdOption("th", "thrift-host", true, "Thrift hostname or IP address (Default: JMX host)"); - addCmdOption("tp", "thrift-port", true, "Thrift port number (Default: 9160)"); - addCmdOption("tf", "thrift-framed", false, "Enable framed transport for Thrift (Default: false)"); - addCmdOption("en", "and-enable", true, "Immediately enable shuffling (create only)"); - addCmdOption("dc", "only-dc", true, "Apply only to named DC (create only)"); - } - - public Shuffle(String host, int port) throws IOException - { - this(host, port, host, 9160, false, null, null); + addCmdOption("th", "thrift-host", true, "Thrift hostname or IP address (Default: JMX host)"); + addCmdOption("tp", "thrift-port", true, "Thrift port number (Default: 9160)"); + addCmdOption("tf", "thrift-framed", false, "Enable framed transport for Thrift (Default: false)"); + addCmdOption("tu", "thrift-user", true, "Thrift username"); + addCmdOption("tpw", "thrift-password", true, "Thrift password"); + addCmdOption("en", "and-enable", true, "Immediately enable shuffling (create only)"); + addCmdOption("dc", "only-dc", true, "Apply only to named DC (create only)"); } - public Shuffle(String host, int port, String thriftHost, int thriftPort, boolean thriftFramed, String username, String password) - throws IOException + public Shuffle(String host, + int port, + String thriftHost, + int thriftPort, + boolean thriftFramed, + String jmxUsername, + String jmxPassword, + String thriftUsername, + String thriftPassword) throws IOException { - super(host, port, username, password); + super(host, port, jmxUsername, jmxPassword); this.thriftHost = thriftHost; this.thriftPort = thriftPort; this.thriftFramed = thriftFramed; + this.thriftUsername = thriftUsername; + this.thriftPassword = thriftPassword; // Setup the StorageService proxy. ssProxy = getSSProxy(jmxConn.getMbeanServerConn()); } - public StorageServiceMBean getSSProxy(MBeanServerConnection mbeanConn) + private StorageServiceMBean getSSProxy(MBeanServerConnection mbeanConn) { - StorageServiceMBean proxy = null; + StorageServiceMBean proxy; try { ObjectName name = new ObjectName(ssObjName); @@ -123,9 +119,9 @@ public class Shuffle extends AbstractJmxClient return proxy; } - public EndpointSnitchInfoMBean getEpSnitchProxy(MBeanServerConnection mbeanConn) + private EndpointSnitchInfoMBean getEpSnitchProxy(MBeanServerConnection mbeanConn) { - EndpointSnitchInfoMBean proxy = null; + EndpointSnitchInfoMBean proxy; try { ObjectName name = new ObjectName(epSnitchObjName); @@ -144,12 +140,12 @@ public class Shuffle extends AbstractJmxClient * @param endpointMap current mapping of endpoint to tokens * @return a new mapping of endpoint to tokens */ - public Multimap calculateRelocations(Multimap endpointMap) + private Multimap calculateRelocations(Multimap endpointMap) { Multimap relocations = HashMultimap.create(); - Set endpoints = new HashSet(endpointMap.keySet()); - Map endpointToNumTokens = new HashMap(endpoints.size()); - Map> iterMap = new HashMap>(endpoints.size()); + Set endpoints = new HashSet<>(endpointMap.keySet()); + Map endpointToNumTokens = new HashMap<>(endpoints.size()); + Map> iterMap = new HashMap<>(endpoints.size()); // Create maps of endpoint to token iterators, and endpoint to number of tokens. for (String endpoint : endpoints) @@ -167,9 +163,8 @@ public class Shuffle extends AbstractJmxClient } int epsToComplete = endpoints.size(); - Set endpointsCompleted = new HashSet(); + Set endpointsCompleted = new HashSet<>(); - outer: while (true) { endpoints.removeAll(endpointsCompleted); @@ -186,7 +181,7 @@ public class Shuffle extends AbstractJmxClient String token = iterMap.get(endpoint).next(); - List subSet = new ArrayList(endpoints); + List subSet = new ArrayList<>(endpoints); subSet.remove(endpoint); Collections.shuffle(subSet, rand); @@ -206,7 +201,7 @@ public class Shuffle extends AbstractJmxClient // We're done when we've exhausted all of the token iterators if (endpointsCompleted.size() == epsToComplete) - break outer; + break; } return relocations; @@ -215,19 +210,9 @@ public class Shuffle extends AbstractJmxClient /** * Enable relocations. * - * @param endpoints sequence of hostname or IP strings - */ - public void enableRelocations(String...endpoints) - { - enableRelocations(Arrays.asList(endpoints)); - } - - /** - * Enable relocations. - * * @param endpoints Collection of hostname or IP strings */ - public void enableRelocations(Collection endpoints) + private void enableRelocations(Collection endpoints) { for (String endpoint : endpoints) { @@ -247,19 +232,9 @@ public class Shuffle extends AbstractJmxClient /** * Disable relocations. * - * @param endpoints sequence of hostname or IP strings - */ - public void disableRelocations(String...endpoints) - { - disableRelocations(Arrays.asList(endpoints)); - } - - /** - * Disable relocations. - * * @param endpoints Collection of hostname or IP strings */ - public void disableRelocations(Collection endpoints) + private void disableRelocations(Collection endpoints) { for (String endpoint : endpoints) { @@ -282,7 +257,7 @@ public class Shuffle extends AbstractJmxClient * @return String endpoint names * @throws ShuffleError */ - public Collection getLiveNodes() throws ShuffleError + private Collection getLiveNodes() throws ShuffleError { try { @@ -302,15 +277,13 @@ public class Shuffle extends AbstractJmxClient */ public void shuffle(boolean enable, String onlyDc) throws ShuffleError { - CassandraClient seedClient = null; - Map tokenMap = null; - IPartitioner partitioner = null; + Map tokenMap; Multimap endpointMap = HashMultimap.create(); EndpointSnitchInfoMBean epSnitchProxy = getEpSnitchProxy(jmxConn.getMbeanServerConn()); try { - seedClient = getThriftClient(thriftHost, thriftPort, thriftFramed); + CassandraClient seedClient = getThriftClient(thriftHost); tokenMap = seedClient.describe_token_map(); for (Map.Entry entry : tokenMap.entrySet()) @@ -338,25 +311,23 @@ public class Shuffle extends AbstractJmxClient } catch (TException e) { - throw new ShuffleError( - String.format("Thrift request to %s:%d failed: %s", thriftHost, thriftPort, e.getMessage())); + throw new ShuffleError(String.format("Thrift request to %s:%d failed: %s", thriftHost, thriftPort, e.getMessage())); } - partitioner = getPartitioner(thriftHost, thriftPort, thriftFramed); - Multimap relocations = calculateRelocations(endpointMap); writeln("%-42s %-15s %-15s", "Token", "From", "To"); writeln("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~+~~~~~~~~~~~~~~~+~~~~~~~~~~~~~~~"); + IPartitioner partitioner = getPartitioner(); + // Store relocations on remote nodes. for (String endpoint : relocations.keySet()) { for (String tok : relocations.get(endpoint)) writeln("%-42s %-15s %-15s", tok, tokenMap.get(tok), endpoint); - String cqlQuery = createShuffleBatchInsert(relocations.get(endpoint), partitioner); - executeCqlQuery(endpoint, thriftPort, thriftFramed, cqlQuery); + executeCqlQuery(endpoint, createShuffleBatchInsert(relocations.get(endpoint), partitioner)); } if (enable) @@ -371,8 +342,8 @@ public class Shuffle extends AbstractJmxClient public void ls() throws ShuffleError { Map> queuedRelocations = listRelocations(); - IPartitioner partitioner = getPartitioner(thriftHost, thriftPort, thriftFramed); boolean justOnce = false; + IPartitioner partitioner = getPartitioner(); for (String host : queuedRelocations.keySet()) { @@ -400,17 +371,16 @@ public class Shuffle extends AbstractJmxClient /** * List pending token relocations for all nodes. * - * @return * @throws ShuffleError */ private Map> listRelocations() throws ShuffleError { String cqlQuery = "SELECT token_bytes,requested_at FROM system.range_xfers"; - Map> results = new HashMap>(); + Map> results = new HashMap<>(); for (String host : getLiveNodes()) { - CqlResult result = executeCqlQuery(host, thriftPort, thriftFramed, cqlQuery); + CqlResult result = executeCqlQuery(host, cqlQuery); results.put(host, result.getRows()); } @@ -428,7 +398,6 @@ public class Shuffle extends AbstractJmxClient for (String host : queuedRelocations.keySet()) { - for (CqlRow row : queuedRelocations.get(host)) { assert row.getColumns().size() == 2; @@ -436,7 +405,7 @@ public class Shuffle extends AbstractJmxClient ByteBuffer tokenBytes = ByteBuffer.wrap(row.getColumns().get(0).getValue()); String query = String.format("DELETE FROM system.range_xfers WHERE token_bytes = 0x%s", ByteBufferUtil.bytesToHex(tokenBytes)); - executeCqlQuery(host, thriftPort, thriftFramed, query); + executeCqlQuery(host, query); } } } @@ -465,18 +434,16 @@ public class Shuffle extends AbstractJmxClient * Setup and return a new Thrift RPC connection. * * @param hostName hostname or address to connect to - * @param port port number to connect to - * @param framed wrap with framed transport if true * @return a CassandraClient instance * @throws ShuffleError */ - public static CassandraClient getThriftClient(String hostName, int port, boolean framed) throws ShuffleError + private CassandraClient getThriftClient(String hostName) throws ShuffleError { try { - return new CassandraClient(hostName, port, framed); + return new CassandraClient(hostName, thriftPort, thriftFramed, thriftUsername, thriftPassword); } - catch (TTransportException e) + catch (TException e) { throw new ShuffleError(String.format("Unable to connect to %s/%d: %s", hostName, port, e.getMessage())); } @@ -486,72 +453,51 @@ public class Shuffle extends AbstractJmxClient * Execute a CQL v3 query. * * @param hostName hostname or address to connect to - * @param port port number to connect to - * @param isFramed wrap with framed transport if true * @param cqlQuery CQL query string * @return a Thrift CqlResult instance * @throws ShuffleError */ - public static CqlResult executeCqlQuery(String hostName, int port, boolean isFramed, String cqlQuery) throws ShuffleError + private CqlResult executeCqlQuery(String hostName, String cqlQuery) throws ShuffleError { - CassandraClient client = null; - - try + try (CassandraClient client = getThriftClient(hostName)) { - client = getThriftClient(hostName, port, isFramed); return client.execute_cql_query(ByteBuffer.wrap(cqlQuery.getBytes()), Compression.NONE); } catch (UnavailableException e) { - throw new ShuffleError( - String.format("Unable to write shuffle entries to %s. Reason: UnavailableException", hostName)); + throw new ShuffleError(String.format("Unable to write shuffle entries to %s. Reason: UnavailableException", hostName)); } catch (TimedOutException e) { - throw new ShuffleError( - String.format("Unable to write shuffle entries to %s. Reason: TimedOutException", hostName)); + throw new ShuffleError(String.format("Unable to write shuffle entries to %s. Reason: TimedOutException", hostName)); } catch (Exception e) { throw new RuntimeException(e); } - finally - { - if (client != null) - client.close(); - } } /** * Return a partitioner instance for remote host. * - * @param hostName hostname or address to connect to - * @param port port number to connect to - * @param framed wrap with framed transport if true * @return an IPartitioner instance * @throws ShuffleError */ - public static IPartitioner getPartitioner(String hostName, int port, boolean framed) throws ShuffleError + private IPartitioner getPartitioner() throws ShuffleError { - String partitionerName = null; + String partitionerName; try { - partitionerName = getThriftClient(hostName, port, framed).describe_partitioner(); - } - catch (InvalidRequestException e) - { - throw new RuntimeException("Error calling describe_partitioner() defies explanation", e); + partitionerName = getThriftClient(thriftHost).describe_partitioner(); } catch (TException e) { - throw new ShuffleError( - String.format("Thrift request to %s:%d failed: %s", hostName, port, e.getMessage())); + throw new ShuffleError(String.format("Thrift request to %s:%d failed: %s", thriftHost, port, e.getMessage())); } try { - Class partitionerClass = Class.forName(partitionerName); - return (IPartitioner)partitionerClass.newInstance(); + return (IPartitioner) Class.forName(partitionerName).newInstance(); } catch (ClassNotFoundException e) { @@ -570,7 +516,7 @@ public class Shuffle extends AbstractJmxClient * @param partitioner an instance of the IPartitioner in use * @return a query string */ - public static String createShuffleBatchInsert(Collection tokens, IPartitioner partitioner) + private String createShuffleBatchInsert(Collection tokens, IPartitioner partitioner) { StringBuilder query = new StringBuilder(); query.append("BEGIN BATCH").append("\n"); @@ -635,9 +581,11 @@ public class Shuffle extends AbstractJmxClient String password = cmd.getOptionValue("password"); String thriftHost = (cmd.getOptionValue("thrift-host") != null) ? cmd.getOptionValue("thrift-host") : hostName; String thriftPort = (cmd.getOptionValue("thrift-port") != null) ? cmd.getOptionValue("thrift-port") : "9160"; + String thriftUsername = (cmd.getOptionValue("thrift-user") != null) ? cmd.getOptionValue("thrift-user") : null; + String thriftPassword = (cmd.getOptionValue("thrift-password") != null) ? cmd.getOptionValue("thrift-password") : null; String onlyDc = cmd.getOptionValue("only-dc"); - boolean thriftFramed = cmd.hasOption("thrift-framed") ? true : false; - boolean andEnable = cmd.hasOption("and-enable") ? true : false; + boolean thriftFramed = cmd.hasOption("thrift-framed"); + boolean andEnable = cmd.hasOption("and-enable"); int portNum = -1, thriftPortNum = -1; // Parse JMX port number @@ -654,7 +602,9 @@ public class Shuffle extends AbstractJmxClient } } else + { portNum = DEFAULT_JMX_PORT; + } // Parse Thrift port number if (thriftPort != null) @@ -670,10 +620,19 @@ public class Shuffle extends AbstractJmxClient } } else + { thriftPortNum = 9160; + } - Shuffle shuffler = new Shuffle(hostName, portNum, thriftHost, thriftPortNum, thriftFramed, - username, password); + Shuffle shuffler = new Shuffle(hostName, + portNum, + thriftHost, + thriftPortNum, + thriftFramed, + username, + password, + thriftUsername, + thriftPassword); try { @@ -706,62 +665,58 @@ public class Shuffle extends AbstractJmxClient System.exit(0); } -} - -/** A self-contained Cassandra.Client; Closeable. */ -class CassandraClient implements Closeable -{ - TTransport transport; - Cassandra.Client client; - CassandraClient(String hostName, int port, boolean framed) throws TTransportException + /** A self-contained Cassandra.Client; Closeable. */ + class CassandraClient implements Closeable { - TSocket socket = new TSocket(hostName, port); - transport = (framed) ? socket : new TFastFramedTransport(socket); - transport.open(); - client = new Cassandra.Client(new TBinaryProtocol(transport)); + TTransport transport; + Cassandra.Client client; - try + CassandraClient(String hostName, int port, boolean framed, String username, String password) throws TException { + TSocket socket = new TSocket(hostName, port); + transport = (framed) ? socket : new TFastFramedTransport(socket); + transport.open(); + client = new Cassandra.Client(new TBinaryProtocol(transport)); + + if (username != null && password != null) + { + AuthenticationRequest request = new AuthenticationRequest(); + request.putToCredentials("username", username); + request.putToCredentials("password", password); + client.login(request); + } + client.set_cql_version("3.0.0"); } - catch (Exception e) + + CqlResult execute_cql_query(ByteBuffer cqlQuery, Compression compression) throws Exception { - throw new RuntimeException(e); + return client.execute_cql3_query(cqlQuery, compression, ConsistencyLevel.ONE); } - } - - CqlResult execute_cql_query(ByteBuffer cqlQuery, Compression compression) throws Exception - { - return client.execute_cql3_query(cqlQuery, compression, ConsistencyLevel.ONE); - } - - String describe_partitioner() throws TException, InvalidRequestException - { - return client.describe_partitioner(); - } - List describe_ring(String keyspace) throws InvalidRequestException, TException - { - return client.describe_ring(keyspace); - } + String describe_partitioner() throws TException + { + return client.describe_partitioner(); + } - Map describe_token_map() throws InvalidRequestException, TException - { - return client.describe_token_map(); - } + Map describe_token_map() throws TException + { + return client.describe_token_map(); + } - public void close() - { - transport.close(); + public void close() + { + transport.close(); + } } -} -@SuppressWarnings("serial") -class ShuffleError extends Exception -{ - ShuffleError(String msg) + @SuppressWarnings("serial") + class ShuffleError extends Exception { - super(msg); + ShuffleError(String msg) + { + super(msg); + } } }