cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1044116 - in /cassandra/branches/cassandra-0.7: ./ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassa...
Date Thu, 09 Dec 2010 20:19:41 GMT
Author: jbellis
Date: Thu Dec  9 20:19:40 2010
New Revision: 1044116

URL: http://svn.apache.org/viewvc?rev=1044116&view=rev
Log:
merge from 0.6

Added:
    cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/net/
    cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/net/MessageSerializerTest.java
Modified:
    cassandra/branches/cassandra-0.7/   (props changed)
    cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
  (props changed)
    cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
  (props changed)
    cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
  (props changed)
    cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
  (props changed)
    cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
  (props changed)
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java

Propchange: cassandra/branches/cassandra-0.7/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec  9 20:19:40 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-1041242
+/cassandra/branches/cassandra-0.6:922689-1041242,1042824,1043070,1043268
 /cassandra/branches/cassandra-0.7:1035666
 /cassandra/trunk:1026516-1026734,1028929
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Propchange: cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec  9 20:19:40 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1041242
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1041242,1042824,1043070,1043268
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1035666
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1026734,1028929
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec  9 20:19:40 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1041242
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1041242,1042824,1043070,1043268
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1035666
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1026734,1028929
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec  9 20:19:40 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1041242
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1041242,1042824,1043070,1043268
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1035666
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1026734,1028929
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec  9 20:19:40 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1041242
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1041242,1042824,1043070,1043268
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1035666
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1026734,1028929
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec  9 20:19:40 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1041242
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1041242,1042824,1043070,1043268
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1035666
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1026734,1028929
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1044116&r1=1044115&r2=1044116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Thu Dec  9 20:19:40 2010
@@ -895,7 +895,7 @@ public class ColumnFamilyStore implement
         return maxFile;
     }
 
-    void forceCleanup() throws ExecutionException, InterruptedException
+    public void forceCleanup() throws ExecutionException, InterruptedException
     {
         CompactionManager.instance.performCleanup(ColumnFamilyStore.this);
     }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1044116&r1=1044115&r2=1044116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
Thu Dec  9 20:19:40 2010
@@ -60,8 +60,7 @@ public class IncomingTcpConnection exten
         {
             try
             {
-                MessagingService.validateMagic(input.readInt());
-                int header = input.readInt();
+                int header = readHeader(input);
                 int type = MessagingService.getBits(header, 1, 2);
                 boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
                 int version = MessagingService.getBits(header, 15, 8);
@@ -76,11 +75,7 @@ public class IncomingTcpConnection exten
                 }
                 else
                 {
-                    int size = input.readInt();
-                    byte[] contentBytes = new byte[size];
-                    input.readFully(contentBytes);
-                    
-                    Message message = Message.serializer().deserialize(new DataInputStream(new
ByteArrayInputStream(contentBytes)));
+                    Message message = readMessage(input);
                     MessagingService.receive(message);
                 }
             }
@@ -101,6 +96,21 @@ public class IncomingTcpConnection exten
         close();
     }
 
+    static int readHeader(DataInputStream in) throws IOException
+    {
+        MessagingService.validateMagic(in.readInt());
+        return in.readInt();
+    }
+
+    static Message readMessage(DataInputStream in) throws IOException
+    {
+        int size = in.readInt();
+        byte[] contentBytes = new byte[size];
+        in.readFully(contentBytes);
+
+        return Message.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(contentBytes)));
+    }
+
     private void close()
     {
         try

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1044116&r1=1044115&r2=1044116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
Thu Dec  9 20:19:40 2010
@@ -310,6 +310,13 @@ public class MessagingService implements
         // get pooled connection (really, connection queue)
         OutboundTcpConnection connection = getConnection(to, message);
 
+        // write it
+        ByteBuffer buffer = serialize(message);
+        connection.write(buffer);
+    }
+
+    static ByteBuffer serialize(Message message)
+    {
         // pack message with header in a bytebuffer
         byte[] data;
         try
@@ -323,12 +330,9 @@ public class MessagingService implements
             throw new RuntimeException(e);
         }
         assert data.length > 0;
-        ByteBuffer buffer = packIt(data , false);
-
-        // write it
-        connection.write(buffer);
+        return packIt(data , false);
     }
-    
+
     public IAsyncResult sendRR(Message message, InetAddress to)
     {
         IAsyncResult iar = new AsyncResult();

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1044116&r1=1044115&r2=1044116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
Thu Dec  9 20:19:40 2010
@@ -1175,32 +1175,23 @@ public class StorageService implements I
         return Gossiper.instance.getCurrentGenerationNumber(FBUtilities.getLocalAddress());
     }
 
-    public void forceTableCleanup() throws IOException, ExecutionException, InterruptedException
+    public void forceTableCleanup(String tableName, String... columnFamilies) throws IOException,
ExecutionException, InterruptedException
     {
-        List<String> tables = DatabaseDescriptor.getNonSystemTables();
-        for (String tName : tables)
+        if (tableName.equals("system"))
+            throw new RuntimeException("Cleanup of the system table is neither necessary
nor wise");
+                    
+        for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
         {
-            Table table = Table.open(tName);
-            table.forceCleanup();
+            cfStore.forceCleanup();
         }
     }
 
-    public void forceTableCleanup(String tableName) throws IOException, ExecutionException,
InterruptedException
-    {
-        Table table = getValidTable(tableName);
-        table.forceCleanup();
-    }
-
-    public void forceTableCompaction() throws IOException, ExecutionException, InterruptedException
-    {
-        for (Table table : Table.all())
-            table.forceCompaction();
-    }
-
-    public void forceTableCompaction(String tableName) throws IOException, ExecutionException,
InterruptedException
+    public void forceTableCompaction(String tableName, String... columnFamilies) throws IOException,
ExecutionException, InterruptedException
     {
-        Table table = getValidTable(tableName);
-        table.forceCompaction();
+        for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
+        {
+            cfStore.forceMajorCompaction();
+        }
     }
 
     /**
@@ -2074,4 +2065,10 @@ public class StorageService implements I
         return partitioner_.describeOwnership(sortedTokens);
     }
 
+    public List<String> getKeyspaces()
+    {
+        List<String> tableslist = new ArrayList<String>(DatabaseDescriptor.getTables());
+        return Collections.unmodifiableList(tableslist);
+    }
+
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1044116&r1=1044115&r2=1044116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageServiceMBean.java
Thu Dec  9 20:19:40 2010
@@ -131,26 +131,6 @@ public interface StorageServiceMBean
     public List<InetAddress> getNaturalEndpoints(String table, byte[] key);
 
     /**
-     * Forces major compaction (all sstable files compacted)
-     */
-    public void forceTableCompaction() throws IOException, ExecutionException, InterruptedException;
-
-    /**
-     * Forces major compaction of a single keyspace
-     */
-    public void forceTableCompaction(String tableName) throws IOException, ExecutionException,
InterruptedException;
-
-    /**
-     * Trigger a cleanup of keys on all tables.
-     */
-    public void forceTableCleanup() throws IOException, ExecutionException, InterruptedException;
-
-    /**
-     * Trigger a cleanup of keys on a single keyspace
-     */
-    public void forceTableCleanup(String tableName) throws IOException, ExecutionException,
InterruptedException;
-
-    /**
      * Takes the snapshot for a given table.
      * 
      * @param tableName the name of the table.
@@ -171,6 +151,16 @@ public interface StorageServiceMBean
     public void clearSnapshot() throws IOException;
 
     /**
+     * Forces major compaction of a single keyspace
+     */
+    public void forceTableCompaction(String tableName, String... columnFamilies) throws IOException,
ExecutionException, InterruptedException;
+
+    /**
+     * Trigger a cleanup of keys on a single keyspace
+     */
+    public void forceTableCleanup(String tableName, String... columnFamilies) throws IOException,
ExecutionException, InterruptedException;
+
+    /**
      * Flush all memtables for the given column families, or all columnfamilies for the given
table
      * if none are explicitly listed.
      * @param tableName
@@ -270,4 +260,6 @@ public interface StorageServiceMBean
      *   a mapping from "token -> %age of cluster owned by that token"
      */
     public Map<Token, Float> getOwnership();
+
+    public List<String> getKeyspaces();
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1044116&r1=1044115&r2=1044116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeCmd.java Thu
Dec  9 20:19:40 2010
@@ -30,6 +30,8 @@ import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.EstimatedHistogram;
 import org.apache.commons.cli.*;
 
@@ -64,6 +66,14 @@ public class NodeCmd {
     {
         this.probe = probe;
     }
+
+    public enum NodeCommand {
+        RING, INFO, CFSTATS, SNAPSHOT, CLEARSNAPSHOT, VERSION, TPSTATS, FLUSH, DRAIN,
+        DECOMMISSION, MOVE, LOADBALANCE, REMOVETOKEN, REPAIR, CLEANUP, COMPACT,
+        SETCACHECAPACITY, GETCOMPACTIONTHRESHOLD, SETCOMPACTIONTHRESHOLD, NETSTATS, CFHISTOGRAMS,
+        COMPACTIONSTATS
+    }
+
     
     /**
      * Prints usage information to stdout.
@@ -71,12 +81,37 @@ public class NodeCmd {
     private static void printUsage()
     {
         HelpFormatter hf = new HelpFormatter();
-        String header = String.format(
-                "%nAvailable commands: ring, info, version, cleanup, compact [keyspacename],
cfstats, snapshot [snapshotname], " +
-                "clearsnapshot, tpstats, flush, drain, repair, decommission, move, loadbalance,
removetoken [status|force]|[token], " +
-                "setcachecapacity [keyspace] [cfname] [keycachecapacity] [rowcachecapacity],
" +
-                "getcompactionthreshold [keyspace] [cfname], setcompactionthreshold [cfname]
[minthreshold] [maxthreshold], " +
-                "netstats [host], cfhistograms <keyspace> <column_family>, compactionstats");
+        String header = "\nAvailable commands:\n"
+                         // No args
+                         + "ring\n"
+                         + "info\n"
+                         + "cfstats\n"
+                         + "clearsnapshot\n"
+                         + "version\n"
+                         + "tpstats\n"
+                         + "drain\n"
+                         + "decommission\n"
+                         + "loadbalance\n"
+                         + "compactionstats\n"
+
+                         // One arg
+                         + "snapshot [snapshotname]\n"
+                         + "netstats [host]\n"
+                         + "move <new token>\n"
+                         + "removetoken status|force|<token>\n"
+
+                         // Two args
+                         + "flush [keyspace] [cfnames]\n"
+                         + "repair [keyspace] [cfnames]\n"
+                         + "cleanup [keyspace] [cfnames]\n"
+                         + "compact [keyspace] [cfnames]\n"
+                         + "getcompactionthreshold <keyspace> <cfname>\n"
+                         + "cfhistograms <keyspace> <cfname>\n"
+
+                         // Four args
+                         + "setcachecapacity <keyspace> <cfname> <keycachecapacity>
<rowcachecapacity>\n"
+                         + "setcompactionthreshold <keyspace> <cfname> <minthreshold>
<maxthreshold>\n";
+
         String usage = String.format("java %s --host <arg> <command>%n", NodeCmd.class.getName());
         hf.printHelp(usage, "", options, header);
     }
@@ -419,9 +454,7 @@ public class NodeCmd {
         }
         catch (ParseException parseExcep)
         {
-            System.err.println(parseExcep);
-            printUsage();
-            System.exit(1);
+            badUse(parseExcep.toString());
         }
 
         String host = cmd.getOptionValue(HOST_OPT_LONG);
@@ -447,248 +480,177 @@ public class NodeCmd {
         }
         catch (IOException ioe)
         {
-            System.err.println("Error connecting to remote JMX agent!");
-            ioe.printStackTrace();
-            System.exit(3);
+            err(ioe, "Error connection to remote JMX agent!");
         }
         
         if (cmd.getArgs().length < 1)
-        {
-            System.err.println("Missing argument for command.");
-            printUsage();
-            System.exit(1);
-        }
-        
+            badUse("Missing argument for command.");
+
         NodeCmd nodeCmd = new NodeCmd(probe);
         
         // Execute the requested command.
         String[] arguments = cmd.getArgs();
         String cmdName = arguments[0];
-        if (cmdName.equals("ring"))
-        {
-            nodeCmd.printRing(System.out);
-        }
-        else if (cmdName.equals("info"))
-        {
-            nodeCmd.printInfo(System.out);
-        }
-        else if (cmdName.equals("cleanup"))
-        {
-            try
-            {
-                if (arguments.length > 1)
-                    probe.forceTableCleanup(arguments[1]);
-                else
-                    probe.forceTableCleanup();
-            }
-            catch (ExecutionException ee)
-            {
-                System.err.println("Error occured during Keyspace cleanup");
-                ee.printStackTrace();
-                System.exit(3);
-            }
-        }
-        else if (cmdName.equals("compact"))
-        {
-            try
-            {
-                if (arguments.length > 1)
-                    probe.forceTableCompaction(arguments[1]);
-                else
-                    probe.forceTableCompaction();
-            }
-            catch (ExecutionException ee)
-            {
-                System.err.println("Error occured during Keyspace compaction");
-                ee.printStackTrace();
-                System.exit(3);
-            }
-        }
-        else if (cmdName.equals("compactionstats"))
-        {
-            nodeCmd.printCompactionStats(System.out);
-        }
-        else if (cmdName.equals("cfstats"))
-        {
-            nodeCmd.printColumnFamilyStats(System.out);
-        }
-        else if (cmdName.equals("decommission"))
-        {
-            probe.decommission();
-        }
-        else if (cmdName.equals("loadbalance"))
-        {
-            probe.loadBalance();
-        }
-        else if (cmdName.equals("move"))
-        {
-            if (arguments.length <= 1)
-            {
-                System.err.println("missing token argument");
-            }
-            probe.move(arguments[1]);
-        }
-        else if (cmdName.equals("removetoken"))
-        {
-            if (arguments.length <= 1)
-            {
-                System.err.println("Missing an argument.");
-                printUsage();
-            }
-            else if (arguments[1].equals("status"))
-            {
-                nodeCmd.printRemovalStatus(System.out);
-            }
-            else if (arguments[1].equals("force"))
-            {
-                nodeCmd.printRemovalStatus(System.out);
-                probe.forceRemoveCompletion();
-            }
-            else
-                probe.removeToken(arguments[1]);
 
-        }
-        else if (cmdName.equals("snapshot"))
-        {
-            String snapshotName = "";
-            if (arguments.length > 1)
-            {
-                snapshotName = arguments[1];
-            }
-            probe.takeSnapshot(snapshotName);
-        }
-        else if (cmdName.equals("clearsnapshot"))
+        boolean validCommand = false;
+        for (NodeCommand n : NodeCommand.values())
         {
-            probe.clearSnapshot();
+            if (cmdName.toUpperCase().equals(n.name()))
+                validCommand = true;
         }
-        else if (cmdName.equals("tpstats"))
-        {
-            nodeCmd.printThreadPoolStats(System.out);
+
+        if (!validCommand)
+            badUse("Unrecognized command: " + cmdName);
+
+        NodeCommand nc = NodeCommand.valueOf(cmdName.toUpperCase());
+        switch (nc)
+        {
+            case RING            : nodeCmd.printRing(System.out); break;
+            case INFO            : nodeCmd.printInfo(System.out); break;
+            case CFSTATS         : nodeCmd.printColumnFamilyStats(System.out); break;
+            case DECOMMISSION    : probe.decommission(); break;
+            case LOADBALANCE     : probe.loadBalance(); break;
+            case CLEARSNAPSHOT   : probe.clearSnapshot(); break;
+            case TPSTATS         : nodeCmd.printThreadPoolStats(System.out); break;
+            case VERSION         : nodeCmd.printReleaseVersion(System.out); break;
+            case COMPACTIONSTATS : nodeCmd.printCompactionStats(System.out); break;
+
+            case DRAIN :
+                try { probe.drain(); }
+                catch (ExecutionException ee) { err(ee, "Error occured during flushing");
}
+                break;
+
+            case NETSTATS :
+                if (arguments.length > 1) { nodeCmd.printNetworkStats(InetAddress.getByName(arguments[1]),
System.out); }
+                else                      { nodeCmd.printNetworkStats(null, System.out);
}
+                break;
+
+            case SNAPSHOT :
+                if (arguments.length > 1) { probe.takeSnapshot(arguments[1]); }
+                else                      { probe.takeSnapshot(""); }
+                break;
+
+            case MOVE :
+                if (arguments.length != 2) { badUse("Missing token argument for move.");
}
+                probe.move(arguments[1]);
+                break;
+
+            case REMOVETOKEN :
+                if (arguments.length != 2) { badUse("Missing an argument for removetoken
(either status, force, or a token)"); }
+                else if (arguments[1].equals("status")) { nodeCmd.printRemovalStatus(System.out);
}
+                else if (arguments[1].equals("force"))  { nodeCmd.printRemovalStatus(System.out);
probe.forceRemoveCompletion(); }
+                else                                    { probe.removeToken(arguments[1]);
}
+                break;
+
+            case CLEANUP :
+            case COMPACT :
+            case REPAIR  :
+            case FLUSH   :
+                optionalKSandCFs(nc, arguments, probe);
+                break;
+
+            case GETCOMPACTIONTHRESHOLD :
+                if (arguments.length != 3) { badUse("getcompactionthreshold requires ks and
cf args."); }
+                probe.getCompactionThreshold(System.out, arguments[1], arguments[2]);
+                break;
+
+            case CFHISTOGRAMS :
+                if (arguments.length != 3) { badUse("cfhistograms requires ks and cf args");
}
+                nodeCmd.printCfHistograms(arguments[1], arguments[2], System.out);
+                break;
+
+            case SETCACHECAPACITY :
+                if (arguments.length != 5) { badUse("setcachecapacity requires ks, cf, keycachecap,
and rowcachecap args."); }
+                probe.setCacheCapacities(arguments[1], arguments[2], Integer.valueOf(arguments[3]),
Integer.valueOf(arguments[4]));
+                break;
+
+            case SETCOMPACTIONTHRESHOLD :
+                if (arguments.length != 5) { badUse("setcompactionthreshold requires ks,
cf, min, and max threshold args."); }
+                int minthreshold = Integer.parseInt(arguments[3]);
+                int maxthreshold = Integer.parseInt(arguments[4]);
+                if ((minthreshold < 0) || (maxthreshold < 0)) { badUse("Thresholds
must be positive integers"); }
+                if (minthreshold > maxthreshold)              { badUse("Min threshold
cannot be greater than max."); }
+                if (minthreshold < 2 && maxthreshold != 0)    { badUse("Min threshold
must be at least 2"); }
+                probe.setCompactionThreshold(arguments[1], arguments[2], minthreshold, maxthreshold);
+                break;
+
+            default :
+                throw new RuntimeException("Unreachable code.");
+
         }
-        else if (cmdName.equals("flush") || cmdName.equals("repair"))
-        {
-            if (cmd.getArgs().length < 2)
-            {
-                System.err.println("Missing keyspace argument.");
-                printUsage();
-                System.exit(1);
-            }
 
-            String[] columnFamilies = new String[cmd.getArgs().length - 2];
-            for (int i = 0; i < columnFamilies.length; i++)
+        System.exit(0);
+    }
+
+    private static void badUse(String useStr)
+    {
+        System.err.println(useStr);
+        printUsage();
+        System.exit(1);
+    }
+
+    private static void err(Exception e, String errStr)
+    {
+        System.err.println(errStr);
+        e.printStackTrace();
+        System.exit(3);
+    }
+
+    private static void optionalKSandCFs(NodeCommand nc, String[] cmdArgs, NodeProbe probe)
throws InterruptedException, IOException
+    {
+        // Per-keyspace
+        if (cmdArgs.length == 1)
+        {
+            for (String keyspace : probe.getKeyspaces())
             {
-                columnFamilies[i] = cmd.getArgs()[i + 2];
-            }
-            if (cmdName.equals("flush"))
-                try
-                {
-                    probe.forceTableFlush(cmd.getArgs()[1], columnFamilies);
-                } catch (ExecutionException ee)
+                switch (nc)
                 {
-                    System.err.println("Error occured during flushing");
-                    ee.printStackTrace();
-                    System.exit(3);
+                    case REPAIR  : probe.forceTableRepair(keyspace); break;
+                    case FLUSH   :
+                        try { probe.forceTableFlush(keyspace); }
+                        catch (ExecutionException ee) { err(ee, "Error occured while flushing
keyspace " + keyspace); }
+                        break;
+                    case COMPACT :
+                        try { probe.forceTableCompaction(keyspace); }
+                        catch (ExecutionException ee) { err(ee, "Error occured while compacting
keyspace " + keyspace); }
+                        break;
+                    case CLEANUP :
+                        if (keyspace.equals("system")) { break; } // Skip cleanup on system
cfs.
+                        try { probe.forceTableCleanup(keyspace); }
+                        catch (ExecutionException ee) { err(ee, "Error occured while cleaning
up keyspace " + keyspace); }
+                        break;
+                    default:
+                        throw new RuntimeException("Unreachable code.");
                 }
-            else // cmdName.equals("repair")
-                probe.forceTableRepair(cmd.getArgs()[1], columnFamilies);
-        }
-        else if (cmdName.equals("drain"))
-        {
-            try 
-            {
-                probe.drain();
-            } catch (ExecutionException ee) 
-            {
-                System.err.println("Error occured during flushing");
-                ee.printStackTrace();
-                System.exit(3);
-            }    	
-        }
-        else if (cmdName.equals("setcachecapacity"))
-        {
-            if (cmd.getArgs().length != 5) // ks cf keycachecap rowcachecap
-            {
-                System.err.println("cacheinfo requires: Keyspace name, ColumnFamily name,
key cache capacity (in keys), and row cache capacity (in rows)");
             }
-            String tableName = cmd.getArgs()[1];
-            String cfName = cmd.getArgs()[2];
-            int keyCacheCapacity = Integer.valueOf(cmd.getArgs()[3]);
-            int rowCacheCapacity = Integer.valueOf(cmd.getArgs()[4]);
-            probe.setCacheCapacities(tableName, cfName, keyCacheCapacity, rowCacheCapacity);
         }
-        else if (cmdName.equals("getcompactionthreshold"))
-        {
-            if (arguments.length < 3) // ks cf
-            {
-                System.err.println("Missing keyspace/cfname");
-                printUsage();
-                System.exit(1);
-            }
-            probe.getCompactionThreshold(System.out, cmd.getArgs()[1], cmd.getArgs()[2]);
-        }
-        else if (cmdName.equals("setcompactionthreshold"))
+        // Per-cf (or listed cfs) in given keyspace
+        else
         {
-            if (cmd.getArgs().length != 5) // ks cf min max
-            {
-                System.err.println("setcompactionthreshold requires: Keyspace name, ColumnFamily
name, " +
-                                   "min threshold, and max threshold.");
-                printUsage();
-                System.exit(1);
-            }
-            String ks = cmd.getArgs()[1];
-            String cf = cmd.getArgs()[2];
-            int minthreshold = Integer.parseInt(arguments[3]);
-            int maxthreshold = Integer.parseInt(arguments[4]);
-
-            if ((minthreshold < 0) || (maxthreshold < 0))
-            {
-                System.err.println("Thresholds must be positive integers.");
-                printUsage();
-                System.exit(1);
-            }
-
-            if (minthreshold > maxthreshold)
-            {
-                System.err.println("Min threshold can't be greater than Max threshold");
-                printUsage();
-                System.exit(1);
-            }
-
-            if (minthreshold < 2 && maxthreshold != 0)
+            String keyspace = cmdArgs[1];
+            String[] columnFamilies = new String[cmdArgs.length - 2];
+            for (int i = 0; i < columnFamilies.length; i++)
             {
-                System.err.println("Min threshold must be at least 2");
-                printUsage();
-                System.exit(1);
+                columnFamilies[i] = cmdArgs[i + 2];
             }
-            probe.setCompactionThreshold(ks, cf, minthreshold, maxthreshold);
-        }
-        else if (cmdName.equals("netstats"))
-        {
-            // optional host
-            String otherHost = arguments.length > 1 ? arguments[1] : null;
-            nodeCmd.printNetworkStats(otherHost == null ? null : InetAddress.getByName(otherHost),
System.out);
-        }
-        else if (cmdName.equals("cfhistograms"))
-        {
-            if (arguments.length < 3)
+            switch (nc)
             {
-                System.err.println("Usage of cfhistograms: <keyspace> <column_family>.");
-                System.exit(1);
+                case REPAIR  : probe.forceTableRepair(keyspace, columnFamilies); break;
+                case FLUSH   :
+                    try { probe.forceTableFlush(keyspace, columnFamilies); }
+                    catch (ExecutionException ee) { err(ee, "Error occured during flushing");
}
+                    break;
+                case COMPACT :
+                    try { probe.forceTableCompaction(keyspace, columnFamilies); }
+                    catch (ExecutionException ee) { err(ee, "Error occured during compaction");
}
+                    break;
+                case CLEANUP :
+                    try { probe.forceTableCleanup(keyspace, columnFamilies); }
+                    catch (ExecutionException ee) { err(ee, "Error occured during cleanup");
}
+                    break;
+                default:
+                    throw new RuntimeException("Unreachable code.");
             }
-
-            nodeCmd.printCfHistograms(arguments[1], arguments[2], System.out);
         }
-        else if (cmdName.equals("version"))
-        {
-            nodeCmd.printReleaseVersion(System.out);
-        }
-        else
-        {
-            System.err.println("Unrecognized command: " + cmdName + ".");
-            printUsage();
-            System.exit(1);
-        }
-
-        System.exit(0);
     }
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1044116&r1=1044115&r2=1044116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/tools/NodeProbe.java Thu
Dec  9 20:19:40 2010
@@ -143,24 +143,14 @@ public class NodeProbe
         jmxc.close();
     }
 
-    public void forceTableCleanup() throws IOException, ExecutionException, InterruptedException
+    public void forceTableCleanup(String tableName, String... columnFamilies) throws IOException,
ExecutionException, InterruptedException
     {
-        ssProxy.forceTableCleanup();
+        ssProxy.forceTableCleanup(tableName, columnFamilies);
     }
 
-    public void forceTableCleanup(String tableName) throws IOException, ExecutionException,
InterruptedException
+    public void forceTableCompaction(String tableName, String... columnFamilies) throws IOException,
ExecutionException, InterruptedException
     {
-        ssProxy.forceTableCleanup(tableName);
-    }
-
-    public void forceTableCompaction() throws IOException, ExecutionException, InterruptedException
-    {
-        ssProxy.forceTableCompaction();
-    }
-
-    public void forceTableCompaction(String tableName) throws IOException, ExecutionException,
InterruptedException
-    {
-        ssProxy.forceTableCompaction(tableName);
+        ssProxy.forceTableCompaction(tableName, columnFamilies);
     }
 
     public void forceTableFlush(String tableName, String... columnFamilies) throws IOException,
ExecutionException, InterruptedException
@@ -489,6 +479,11 @@ public class NodeProbe
 
         return cfsProxy;
     }
+
+    public List<String> getKeyspaces()
+    {
+        return ssProxy.getKeyspaces();
+    }
 }
 
 class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>>
@@ -553,5 +548,5 @@ class ThreadPoolProxyMBeanIterator imple
     public void remove()
     {
         throw new UnsupportedOperationException();
-    }   
+    }
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1044116&r1=1044115&r2=1044116&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/utils/FBUtilities.java
Thu Dec  9 20:19:40 2010
@@ -342,9 +342,9 @@ public class FBUtilities
     public static String bytesToHex(ByteBuffer bytes)
     {
         StringBuilder sb = new StringBuilder();
-        for (int i=bytes.position()+bytes.arrayOffset(); i<bytes.limit()+bytes.arrayOffset();
i++)
+        for (int i = bytes.position(); i < bytes.limit(); i++)
         {
-            int bint = bytes.array()[i] & 0xff;
+            int bint = bytes.get(i) & 0xff;
             if (bint <= 0xF)
                 // toHexString does not 0 pad its results.
                 sb.append("0");

Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/net/MessageSerializerTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/net/MessageSerializerTest.java?rev=1044116&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/net/MessageSerializerTest.java
(added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/net/MessageSerializerTest.java
Thu Dec  9 20:19:40 2010
@@ -0,0 +1,44 @@
+package org.apache.cassandra.net;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+
+public class MessageSerializerTest extends SchemaLoader
+{
+    @Test
+    public void testDeserialize() throws IOException
+    {
+        String wire = "ca552dfa0000010000000080000131047f00000100000000000000000000004800094b657973706163653100046b65793100000001000003e801000003e8800000008000000000000000000000010007436f6c756d6e310000000000000000000000000461736466000000000000000000000000000000000000000000000000000000000000000000000000";
+        byte[] bytes = FBUtilities.hexToBytes(wire);
+        check(new DataInputStream(new ByteArrayInputStream(bytes)));
+    }
+
+    private void check(DataInputStream in) throws IOException
+    {
+        IncomingTcpConnection.readHeader(in);
+        IncomingTcpConnection.readMessage(in);
+    }
+
+    @Test
+    public void testRoundTrip() throws IOException
+    {
+        RowMutation rm;
+        rm = new RowMutation("Keyspace1", ByteBufferUtil.bytes("key1"));
+        rm.add(new QueryPath("Standard1", null, ByteBufferUtil.bytes("Column1")), ByteBufferUtil.bytes("asdf"),
0);
+        Message message = rm.makeRowMutationMessage();
+        ByteBuffer bb = MessagingService.serialize(message);
+        check(new DataInputStream(new ByteArrayInputStream(bb.array(), bb.position() + bb.arrayOffset(),
bb.remaining())));
+    }
+}



Mime
View raw message