cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject git commit: fix BulkLoader recognition of CQL3 columnfamilies patch by Aleksey Yeschenko; reviewed by jbellis for CASSANDRA-4755
Date Mon, 22 Oct 2012 16:48:20 GMT
Updated Branches:
  refs/heads/trunk f02928f88 -> a58b87020


fix BulkLoader recognition of CQL3 columnfamilies
patch by Aleksey Yeschenko; reviewed by jbellis for CASSANDRA-4755


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a58b8702
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a58b8702
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a58b8702

Branch: refs/heads/trunk
Commit: a58b87020b4315eab48482666450049fb7cf0674
Parents: f02928f
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Mon Oct 22 11:48:07 2012 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Mon Oct 22 11:48:07 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/tools/BulkLoader.java     |   32 +++++++-------
 2 files changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a58b8702/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e14ffa7..4026428 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2-beta2
+ * fix BulkLoader recognition of CQL3 columnfamilies (CASSANDRA-4755)
  * Sort commitlog segments for replay by id instead of mtime (CASSANDRA-4793)
  * Make hint delivery asynchronous (CASSANDRA-4761)
  * Pluggable Thrift transport factories for CLI and cqlsh (CASSANDRA-4609, 4610)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a58b8702/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index f2f018f..c838cb0 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -23,15 +23,19 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
 
+import org.apache.commons.cli.*;
+
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.SystemTable;
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.streaming.PendingFile;
 import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.OutputHandler;
-import org.apache.commons.cli.*;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TFramedTransport;
@@ -162,7 +166,7 @@ public class BulkLoader
 
             sb.append("[total: ").append(totalSize == 0 ? 100L : totalProgress * 100L / totalSize).append("
- ");
             sb.append(mbPerSec(deltaProgress, deltaTime)).append("MB/s");
-            sb.append(" (avg: ").append(mbPerSec(totalProgress, time - startTime)).append("MB/s)]");;
+            sb.append(" (avg: ").append(mbPerSec(totalProgress, time - startTime)).append("MB/s)]");
             System.out.print(sb.toString());
             return done;
         }
@@ -176,7 +180,7 @@ public class BulkLoader
 
     static class ExternalClient extends SSTableLoader.Client
     {
-        private final Map<String, Set<String>> knownCfs = new HashMap<String,
Set<String>>();
+        private final Set<String> knownCfs = new HashSet<String>();
         private final Set<InetAddress> hosts;
         private final int rpcPort;
         private final String user;
@@ -198,17 +202,14 @@ public class BulkLoader
             {
                 try
                 {
-
                     // Query endpoint to ranges map and schemas from thrift
                     InetAddress host = hostiter.next();
                     Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort,
this.user, this.passwd);
-                    List<TokenRange> tokenRanges = client.describe_ring(keyspace);
-                    List<KsDef> ksDefs = client.describe_keyspaces();
 
                     setPartitioner(client.describe_partitioner());
                     Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
 
-                    for (TokenRange tr : tokenRanges)
+                    for (TokenRange tr : client.describe_ring(keyspace))
                     {
                         Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token),
tkFactory.fromString(tr.end_token));
                         for (String ep : tr.endpoints)
@@ -217,13 +218,13 @@ public class BulkLoader
                         }
                     }
 
-                    for (KsDef ksDef : ksDefs)
-                    {
-                        Set<String> cfs = new HashSet<String>();
-                        for (CfDef cfDef : ksDef.cf_defs)
-                            cfs.add(cfDef.name);
-                        knownCfs.put(ksDef.name, cfs);
-                    }
+                    String query = String.format("SELECT columnfamily_name FROM %s.%s WHERE
keyspace_name = '%s'",
+                                                 Table.SYSTEM_KS,
+                                                 SystemTable.SCHEMA_COLUMNFAMILIES_CF,
+                                                 keyspace);
+                    CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query),
Compression.NONE, ConsistencyLevel.ONE);
+                    for (CqlRow row : result.rows)
+                        knownCfs.add(new String(row.getColumns().get(0).getValue(), "UTF8"));
                     break;
                 }
                 catch (Exception e)
@@ -236,8 +237,7 @@ public class BulkLoader
 
         public boolean validateColumnFamily(String keyspace, String cfName)
         {
-            Set<String> cfs = knownCfs.get(keyspace);
-            return cfs != null && cfs.contains(cfName);
+            return knownCfs.contains(cfName);
         }
 
         private static Cassandra.Client createThriftClient(String host, int port, String
user, String passwd) throws Exception


Mime
View raw message