cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1059145 - in /cassandra/trunk: ./ conf/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/net/ s...
Date Fri, 14 Jan 2011 19:53:01 GMT
Author: jbellis
Date: Fri Jan 14 19:53:00 2011
New Revision: 1059145

URL: http://svn.apache.org/viewvc?rev=1059145&view=rev
Log:
merge from 0.7

Added:
    cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java
    cassandra/trunk/test/resources/UnsortedSuperCF.json
Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/conf/cassandra.yaml
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
    cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java
    cassandra/trunk/test/resources/SimpleCF.json
    cassandra/trunk/test/resources/SuperCF.json
    cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 14 19:53:00 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7:1026516-1057933
+/cassandra/branches/cassandra-0.7:1026516-1059144
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/conf/cassandra.yaml?rev=1059145&r1=1059144&r2=1059145&view=diff
==============================================================================
--- cassandra/trunk/conf/cassandra.yaml (original)
+++ cassandra/trunk/conf/cassandra.yaml Fri Jan 14 19:53:00 2011
@@ -109,12 +109,16 @@ seed_provider:
 # non-mmapped i/o.)
 disk_access_mode: auto
 
-# Unlike most systems, in Cassandra writes are faster than reads, so
-# you can afford more of those in parallel.  A good rule of thumb is 2
-# concurrent reads per processor core.  Increase ConcurrentWrites to
-# the number of clients writing at once if you enable CommitLogSync +
-# CommitLogSyncDelay. -->
-concurrent_reads: 8
+# For workloads with more data than can fit in memory, Cassandra's
+# bottleneck will be reads that need to fetch data from
+# disk. "concurrent_reads" should be set to (16 * number_of_drives) in
+# order to allow the operations to enqueue low enough in the stack
+# that the OS and drives can reorder them.
+#
+# On the other hand, since writes are almost never IO bound, the ideal
+# number of "concurrent_writes" is dependent on the number of cores in
+# your system; (8 * number_of_cores) is a good rule of thumb.
+concurrent_reads: 32
 concurrent_writes: 32
 
 # This sets the amount of memtable flush writer threads.  These will

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 14 19:53:00 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1057933
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1059144
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 14 19:53:00 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1057933
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1059144
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 14 19:53:00 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1057933
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1059144
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 14 19:53:00 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1057933
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1059144
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 14 19:53:00 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1057933
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1059144
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1059145&r1=1059144&r2=1059145&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Jan 14 19:53:00 2011
@@ -1723,6 +1723,11 @@ public class ColumnFamilyStore implement
         ssTables.getRowCache().clear();
     }
 
+    public void invalidateKeyCache()
+    {
+        ssTables.getKeyCache().clear();
+    }
+
     public int getRowCacheCapacity()
     {
         return ssTables.getRowCache().getCapacity();

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java?rev=1059145&r1=1059144&r2=1059145&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java Fri Jan 14 19:53:00 2011
@@ -146,10 +146,16 @@ public interface ColumnFamilyStoreMBean
     public void forceMajorCompaction() throws ExecutionException, InterruptedException;
 
     /**
+     * invalidate the key cache; for use after invalidating row cache
+     */
+    public void invalidateKeyCache();
+
+    /**
      * invalidate the row cache; for use after bulk loading via BinaryMemtable
      */
     public void invalidateRowCache();
 
+
     /**
      * return the size of the smallest compacted row
      * @return

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java?rev=1059145&r1=1059144&r2=1059145&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/ApplicationState.java Fri Jan 14 19:53:00 2011
@@ -26,6 +26,8 @@ public enum ApplicationState
     STATUS,
     LOAD,
     SCHEMA,
+    DC,
+    RACK,
     // pad to allow adding new states to existing cluster
     X1,
     X2,

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java?rev=1059145&r1=1059144&r2=1059145&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/EndpointState.java Fri Jan 14 19:53:00 2011
@@ -36,6 +36,8 @@ import org.cliffc.high_scale_lib.NonBloc
 
 public class EndpointState
 {
+    protected static Logger logger = LoggerFactory.getLogger(EndpointState.class);
+
     private final static ICompactSerializer<EndpointState> serializer_ = new EndpointStateSerializer();
 
     volatile HeartBeatState hbState_;

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1059145&r1=1059144&r2=1059145&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Fri Jan 14 19:53:00 2011
@@ -433,7 +433,7 @@ public class Gossiper implements IFailur
         }
     }
 
-    EndpointState getEndpointStateForEndpoint(InetAddress ep)
+    public EndpointState getEndpointStateForEndpoint(InetAddress ep)
     {
         return endpointStateMap_.get(ep);
     }
@@ -848,6 +848,9 @@ public class Gossiper implements IFailur
             endpointStateMap_.put(localEndpoint_, localState);
         }
 
+        //notify snitches that Gossiper is about to start
+        DatabaseDescriptor.getEndpointSnitch().gossiperStarting();
+
         scheduledGossipTask = StorageService.scheduledTasks.scheduleWithFixedDelay(new GossipTask(),
                                                                                    Gossiper.intervalInMillis_,
                                                                                    Gossiper.intervalInMillis_,

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java?rev=1059145&r1=1059144&r2=1059145&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/VersionedValue.java Fri Jan 14 19:53:00 2011
@@ -132,6 +132,16 @@ public class VersionedValue implements C
                                         + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
         }
 
+        public VersionedValue datacenter(String dcId)
+        {
+            return new VersionedValue(dcId);
+        }
+
+        public VersionedValue rack(String rackId)
+        {
+            return new VersionedValue(rackId);
+        }
+
     }
 
     private static class VersionedValueSerializer implements ICompactSerializer<VersionedValue>

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java?rev=1059145&r1=1059144&r2=1059145&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java Fri Jan 14 19:53:00 2011
@@ -37,4 +37,9 @@ public abstract class AbstractEndpointSn
     {
         return a1.getHostAddress().compareTo(a2.getHostAddress());
     }
+
+    public void gossiperStarting()
+    {
+        //noop by default
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=1059145&r1=1059144&r2=1059145&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java Fri Jan 14 19:53:00 2011
@@ -105,6 +105,12 @@ public class DynamicEndpointSnitch exten
         }
     }
 
+    @Override
+    public void gossiperStarting()
+    {
+        subsnitch.gossiperStarting();
+    }
+
     public String getRack(InetAddress endpoint)
     {
         return subsnitch.getRack(endpoint);
@@ -302,5 +308,4 @@ class AdaptiveLatencyTracker extends Abs
         }
         return log;
     }
-
 }

Added: cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java?rev=1059145&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2Snitch.java Fri Jan 14 19:53:00 2011
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.io.DataInputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.URL;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * A snitch that assumes an EC2 region is a DC and an EC2 availability_zone
+ *  is a rack. This information is available in the config for the node.
+ */
+public class Ec2Snitch extends AbstractNetworkTopologySnitch
+{
+    protected static Logger logger = LoggerFactory.getLogger(Ec2Snitch.class);
+    protected String ec2zone;
+    protected String ec2region;
+
+    public Ec2Snitch() throws IOException, ConfigurationException
+    {
+        // Populate the region and zone by introspection, fail if 404 on metadata
+        HttpURLConnection conn = (HttpURLConnection) new URL("http://169.254.169.254/latest/meta-data/placement/availability-zone").openConnection();
+        conn.setRequestMethod("GET");
+        if (conn.getResponseCode() != 200)
+        {
+            throw new ConfigurationException("Ec2Snitch was unable to find region/zone data. Not an ec2 node?");
+        }
+
+        // Read the information. I wish I could say (String) conn.getContent() here...
+        int cl = conn.getContentLength();
+        byte[] b = new byte[cl];
+        DataInputStream d = new DataInputStream((FilterInputStream)conn.getContent());
+        d.readFully(b);
+
+        // Split "us-east-1a" or "asia-1a" into "us-east"/"1a" and "asia"/"1a".
+        String azone = new String(b ,"UTF-8");
+        String[] splits = azone.split("-");
+        ec2zone = splits[splits.length - 1];
+        ec2region = splits.length < 3 ? splits[0] : splits[0]+"-"+splits[1];
+        logger.info("EC2Snitch using region: " + ec2region + ", zone: " + ec2zone + ".");
+    }
+
+    public String getRack(InetAddress endpoint)
+    {
+        if (endpoint == FBUtilities.getLocalAddress())
+            return ec2zone;
+        else
+            return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.RACK).value;
+    }
+
+    public String getDatacenter(InetAddress endpoint)
+    {
+        if (endpoint == FBUtilities.getLocalAddress())
+            return ec2region;
+        else
+            return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.DC).value;
+    }
+
+    @Override
+    public void gossiperStarting()
+    {
+        // Share EC2 info via gossip.  We have to wait until Gossiper is initialized though.
+        logger.info("Ec2Snitch adding ApplicationState ec2region=" + ec2region + " ec2zone=" + ec2zone);
+        Gossiper.instance.addLocalApplicationState(ApplicationState.DC, StorageService.valueFactory.datacenter(ec2region));
+        Gossiper.instance.addLocalApplicationState(ApplicationState.RACK, StorageService.valueFactory.rack(ec2zone));
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java?rev=1059145&r1=1059144&r2=1059145&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/IEndpointSnitch.java Fri Jan 14 19:53:00 2011
@@ -54,4 +54,9 @@ public interface IEndpointSnitch
      * compares two endpoints in relation to the target endpoint, returning as Comparator.compare would
      */
     public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2);
+
+    /**
+     * called after Gossiper instance exists immediately before it starts gossiping
+     */
+    public void gossiperStarting();
 }
\ No newline at end of file

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1059145&r1=1059144&r2=1059145&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri Jan 14 19:53:00 2011
@@ -444,7 +444,7 @@ public final class MessagingService impl
     
     public IMessageCallback removeRegisteredCallback(String messageId)
     {
-        targets.remove(messageId); // TODO fix this when we clean up quorum reads to do proper RR
+        targets.remove(messageId);
         return callbacks.remove(messageId);
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1059145&r1=1059144&r2=1059145&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java Fri Jan 14 19:53:00 2011
@@ -81,15 +81,6 @@ public class ReadCallback<T> implements 
         return blockfor == 1 ? resolver.getData() : resolver.resolve();
     }
 
-    public void close()
-    {
-        MessagingService ms = MessagingService.instance();
-        for (Message response : resolver.getMessages())
-        {
-            ms.removeRegisteredCallback(response.getMessageId());
-        }
-    }
-    
     public void response(Message message)
     {
         resolver.preprocess(message);

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1059145&r1=1059144&r2=1059145&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Fri Jan 14 19:53:00 2011
@@ -1150,6 +1150,22 @@ public class StorageService implements I
         }
     }
 
+    public void invalidateKeyCaches(String tableName, String... columnFamilies) throws IOException
+    {
+        for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
+        {
+            cfStore.invalidateKeyCache();
+        }
+    }
+
+    public void invalidateRowCaches(String tableName, String... columnFamilies) throws IOException
+    {
+        for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
+        {
+            cfStore.invalidateRowCache();
+        }
+    }
+
     /**
      * Takes the snapshot for a given table.
      *

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1059145&r1=1059144&r2=1059145&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Fri Jan 14 19:53:00 2011
@@ -280,4 +280,7 @@ public interface StorageServiceMBean
 
     // to determine if gossip is disabled
     public boolean isInitialized();
+
+    public void invalidateKeyCaches(String ks, String... cfs) throws IOException;
+    public void invalidateRowCaches(String ks, String... cfs) throws IOException;
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java?rev=1059145&r1=1059144&r2=1059145&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeCmd.java Fri Jan 14 19:53:00 2011
@@ -74,7 +74,7 @@ public class NodeCmd {
         RING, INFO, CFSTATS, SNAPSHOT, CLEARSNAPSHOT, VERSION, TPSTATS, FLUSH, DRAIN,
         DECOMMISSION, MOVE, LOADBALANCE, REMOVETOKEN, REPAIR, CLEANUP, COMPACT,
         SETCACHECAPACITY, GETCOMPACTIONTHRESHOLD, SETCOMPACTIONTHRESHOLD, NETSTATS, CFHISTOGRAMS,
-        COMPACTIONSTATS, DISABLEGOSSIP, ENABLEGOSSIP
+        COMPACTIONSTATS, DISABLEGOSSIP, ENABLEGOSSIP, INVALIDATEKEYCACHE, INVALIDATEROWCACHE
     }
 
     
@@ -110,6 +110,8 @@ public class NodeCmd {
                          + "repair [keyspace] [cfnames]\n"
                          + "cleanup [keyspace] [cfnames]\n"
                          + "compact [keyspace] [cfnames]\n"
+                         + "invalidatekeycache [keyspace] [cfnames]\n"
+                         + "invalidaterowcache [keyspace] [cfnames]\n"
                          + "getcompactionthreshold <keyspace> <cfname>\n"
                          + "cfhistograms <keyspace> <cfname>\n"
 
@@ -556,6 +558,8 @@ public class NodeCmd {
             case COMPACT :
             case REPAIR  :
             case FLUSH   :
+            case INVALIDATEKEYCACHE :
+            case INVALIDATEROWCACHE :
                 optionalKSandCFs(nc, arguments, probe);
                 break;
 
@@ -615,7 +619,9 @@ public class NodeCmd {
             {
                 switch (nc)
                 {
-                    case REPAIR  : probe.forceTableRepair(keyspace); break;
+                    case REPAIR             : probe.forceTableRepair(keyspace); break;
+                    case INVALIDATEKEYCACHE : probe.invalidateKeyCaches(keyspace); break;
+                    case INVALIDATEROWCACHE : probe.invalidateRowCaches(keyspace); break;
                     case FLUSH   :
                         try { probe.forceTableFlush(keyspace); }
                         catch (ExecutionException ee) { err(ee, "Error occured while flushing keyspace " + keyspace); }
@@ -646,6 +652,8 @@ public class NodeCmd {
             switch (nc)
             {
                 case REPAIR  : probe.forceTableRepair(keyspace, columnFamilies); break;
+                case INVALIDATEKEYCACHE : probe.invalidateKeyCaches(keyspace, columnFamilies); break;
+                case INVALIDATEROWCACHE : probe.invalidateRowCaches(keyspace, columnFamilies); break;
                 case FLUSH   :
                     try { probe.forceTableFlush(keyspace, columnFamilies); }
                     catch (ExecutionException ee) { err(ee, "Error occured during flushing"); }

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=1059145&r1=1059144&r2=1059145&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Fri Jan 14 19:53:00 2011
@@ -183,7 +183,17 @@ public class NodeProbe
     {
         ssProxy.forceTableRepair(tableName, columnFamilies);
     }
-    
+
+    public void invalidateKeyCaches(String tableName, String... columnFamilies) throws IOException
+    {
+        ssProxy.invalidateKeyCaches(tableName, columnFamilies);
+    }
+
+    public void invalidateRowCaches(String tableName, String... columnFamilies) throws IOException
+    {
+        ssProxy.invalidateRowCaches(tableName, columnFamilies);
+    }
+
     public void drain() throws IOException, InterruptedException, ExecutionException
     {
         ssProxy.drain();	

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java?rev=1059145&r1=1059144&r2=1059145&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableExport.java Fri Jan 14 19:53:00 2011
@@ -251,19 +251,25 @@ public class SSTableExport
 
         outs.println("{");
 
+        SSTableIdentityIterator row;
+
+        boolean elementWritten = false;
         while (scanner.hasNext())
         {
-            SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+            row = (SSTableIdentityIterator) scanner.next();
+
             if (excludeSet.contains(bytesToHex(row.getKey().key)))
                 continue;
+            else if (elementWritten)
+                outs.println(",");
+
             try
             {
                 serializeRow(outs, row);
-                outs.print("  ");
-                if (scanner.hasNext())
-                    outs.println(",");
-                else
-                    outs.println();
+
+                // used to decide should we put ',' after previous row or not
+                if (!elementWritten)
+                    elementWritten = true;
             }
             catch (IOException ioexcep)
             {
@@ -277,7 +283,7 @@ public class SSTableExport
             }
         }
         
-        outs.println("}");
+        outs.printf("%n}%n");
         outs.flush();
     }
     

Modified: cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java?rev=1059145&r1=1059144&r2=1059145&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/tools/SSTableImport.java Fri Jan 14 19:53:00 2011
@@ -18,13 +18,10 @@
 
 package org.apache.cassandra.tools;
 
-import java.io.FileReader;
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
+import java.util.*;
 
 import org.apache.commons.cli.*;
 
@@ -35,10 +32,12 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
-import org.json.simple.JSONValue;
-import org.json.simple.parser.ParseException;
+import org.codehaus.jackson.type.TypeReference;
+
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.map.MappingJsonFactory;
+
+import org.codehaus.jackson.JsonParser;
 
 import static org.apache.cassandra.utils.FBUtilities.hexToBytes;
 
@@ -48,71 +47,108 @@ import static org.apache.cassandra.utils
 public class SSTableImport
 {
     private static final String KEYSPACE_OPTION = "K";
-    private static final String COLFAM_OPTION = "c";
+    private static final String COLUMN_FAMILY_OPTION = "c";
+    private static final String KEY_COUNT_OPTION = "n";
+    private static final String IS_SORTED_OPTION = "s";
+
     private static Options options;
     private static CommandLine cmd;
 
+    private static Integer keyCountToImport = null;
+    private static boolean isSorted = false;
+
+    private static JsonFactory factory = new MappingJsonFactory();
+
     static
     {
         options = new Options();
-        Option optKeyspace = new Option(KEYSPACE_OPTION, true, "Keyspace name");
+
+        Option optKeyspace = new Option(KEYSPACE_OPTION, true, "Keyspace name.");
         optKeyspace.setRequired(true);
         options.addOption(optKeyspace);
-        Option optColfamily = new Option(COLFAM_OPTION, true, "Column family");
+
+        Option optColfamily = new Option(COLUMN_FAMILY_OPTION, true, "Column Family name.");
         optColfamily.setRequired(true);
         options.addOption(optColfamily);
+
+        options.addOption(new Option(KEY_COUNT_OPTION, true, "Number of keys to import (Optional)."));
+        options.addOption(new Option(IS_SORTED_OPTION, false, "Assume JSON file as already sorted (e.g. created by sstable2json tool) (Optional)."));
     }
     
-    private static class JsonColumn
+    private static class JsonColumn<T>
     {
-        private String name;
-        private String value;
+        private ByteBuffer name;
+        private ByteBuffer value;
         private long timestamp;
         private boolean isDeleted;
         private int ttl;
         private int localExpirationTime;
-        
-        private JsonColumn(Object obj) throws ClassCastException
+
+        public JsonColumn(T json)
         {
-            JSONArray colSpec = (JSONArray)obj;
-            assert colSpec.size() == 4 || colSpec.size() == 6;
-            name = (String)colSpec.get(0);
-            value = (String)colSpec.get(1);
-            timestamp = (Long)colSpec.get(2);
-            isDeleted = (Boolean)colSpec.get(3);
-            if (colSpec.size() == 6)
+            if (json instanceof List)
             {
-                ttl = (int)(long)((Long)colSpec.get(4));
-                localExpirationTime = (int)(long)((Long)colSpec.get(5));
+                List fields = (List<?>) json;
+
+                assert fields.size() == 4 || fields.size() == 6 : "Column definition should have 4 or 6 fields.";
+
+                name      = ByteBuffer.wrap(hexToBytes((String) fields.get(0)));
+                value     = ByteBuffer.wrap(hexToBytes((String) fields.get(1)));
+                timestamp = (Long) fields.get(2);
+                isDeleted = (Boolean) fields.get(3);
+
+                if (fields.size() == 6)
+                {
+                    ttl = (Integer) fields.get(4);
+                    localExpirationTime = (int) (long) ((Long) fields.get(5));
+                }
             }
         }
+
+        public ByteBuffer getName()
+        {
+            return name.duplicate();
+        }
+
+        public ByteBuffer getValue()
+        {
+            return value.duplicate();
+        }
+    }
+
+    private static void addToStandardCF(List<?> row, ColumnFamily cfamily)
+    {
+        addColumnsToCF(row, null, cfamily);
     }
 
     /**
      * Add columns to a column family.
      * 
      * @param row the columns associated with a row
+     * @param superName name of the super column if any
      * @param cfamily the column family to add columns to
      */
-    private static void addToStandardCF(JSONArray row, ColumnFamily cfamily)
+    private static void addColumnsToCF(List<?> row, ByteBuffer superName, ColumnFamily cfamily)
     {
         CFMetaData cfm = cfamily.metadata();
         assert cfm != null;
+
         for (Object c : row)
         {
-            JsonColumn col = new JsonColumn(c);
-            QueryPath path = new QueryPath(cfm.cfName, null, ByteBuffer.wrap(hexToBytes(col.name)));
+            JsonColumn col = new JsonColumn<List>((List) c);
+            QueryPath path = new QueryPath(cfm.cfName, superName, col.getName());
+
             if (col.ttl > 0)
             {
-                cfamily.addColumn(null, new ExpiringColumn(ByteBuffer.wrap(hexToBytes(col.name)), ByteBuffer.wrap(hexToBytes(col.value)), col.timestamp, col.ttl, col.localExpirationTime));
+                cfamily.addColumn(null, new ExpiringColumn(col.getName(), col.getValue(), col.timestamp, col.ttl, col.localExpirationTime));
             }
             else if (col.isDeleted)
             {
-                cfamily.addTombstone(path, ByteBuffer.wrap(hexToBytes(col.value)), col.timestamp);
+                cfamily.addTombstone(path, col.getValue(), col.timestamp);
             }
             else
             {
-                cfamily.addColumn(path, ByteBuffer.wrap(hexToBytes(col.value)), col.timestamp);
+                cfamily.addColumn(path, col.getValue(), col.timestamp);
             }
         }
     }
@@ -123,38 +159,23 @@ public class SSTableImport
      * @param row the super columns associated with a row
      * @param cfamily the column family to add columns to
      */
-    private static void addToSuperCF(JSONObject row, ColumnFamily cfamily)
+    private static void addToSuperCF(Map<?, ?> row, ColumnFamily cfamily)
     {
-        CFMetaData cfm = cfamily.metadata();
-        assert cfm != null;
+        CFMetaData metaData = cfamily.metadata();
+        assert metaData != null;
+
         // Super columns
-        for (Map.Entry<String, JSONObject> entry : (Set<Map.Entry<String, JSONObject>>)row.entrySet())
+        for (Map.Entry<?, ?> entry : row.entrySet())
         {
-            ByteBuffer superName = ByteBuffer.wrap(hexToBytes(entry.getKey()));
-            long deletedAt = (Long)entry.getValue().get("deletedAt");
-            JSONArray subColumns = (JSONArray)entry.getValue().get("subColumns");
-            
-            // Add sub-columns
-            for (Object c : subColumns)
-            {
-                JsonColumn col = new JsonColumn(c);
-                QueryPath path = new QueryPath(cfm.cfName, superName, ByteBuffer.wrap(hexToBytes(col.name)));
-                if (col.ttl > 0)
-                {
-                    cfamily.addColumn(superName, new ExpiringColumn(ByteBuffer.wrap(hexToBytes(col.name)), ByteBuffer.wrap(hexToBytes(col.value)), col.timestamp, col.ttl, col.localExpirationTime));
-                }
-                else if (col.isDeleted)
-                {
-                    cfamily.addTombstone(path, ByteBuffer.wrap(hexToBytes(col.value)), col.timestamp);
-                }
-                else
-                {
-                    cfamily.addColumn(path, ByteBuffer.wrap(hexToBytes(col.value)), col.timestamp);
-                }
-            }
-            
-            SuperColumn superColumn = (SuperColumn)cfamily.getColumn(superName);
-            superColumn.markForDeleteAt((int)(System.currentTimeMillis()/1000), deletedAt);
+            ByteBuffer superName = ByteBuffer.wrap(hexToBytes((String) entry.getKey()));
+            Map<?, ?> data = (Map<?, ?>) entry.getValue();
+
+            addColumnsToCF((List<?>) data.get("subColumns"), superName, cfamily);
+
+            // *WARNING* markForDeleteAt has been DEPRECATED at Cassandra side
+            //BigInteger deletedAt = (BigInteger) data.get("deletedAt");
+            //SuperColumn superColumn = (SuperColumn) cfamily.getColumn(superName);
+            //superColumn.markForDeleteAt((int) (System.currentTimeMillis()/1000), deletedAt);
         }
     }
 
@@ -165,44 +186,181 @@ public class SSTableImport
      * @param keyspace keyspace the data belongs to
      * @param cf column family the data belongs to
      * @param ssTablePath file to write the SSTable to
+     *
      * @throws IOException for errors reading/writing input/output
-     * @throws ParseException for errors encountered parsing JSON input
      */
-    public static void importJson(String jsonFile, String keyspace, String cf, String ssTablePath)
-    throws IOException, ParseException
+    public static void importJson(String jsonFile, String keyspace, String cf, String ssTablePath) throws IOException
     {
-        ColumnFamily cfamily = ColumnFamily.create(keyspace, cf);
-        ColumnFamilyType cfType = cfamily.getColumnFamilyType();    // Super or Standard
+        ColumnFamily columnFamily = ColumnFamily.create(keyspace, cf);
         IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();
 
-        try
+        int importedKeys = (isSorted) ? importSorted(jsonFile, columnFamily, ssTablePath, partitioner)
+                                      : importUnsorted(getParser(jsonFile), columnFamily, ssTablePath, partitioner);
+
+        if (importedKeys != -1)
+            System.out.printf("%d keys imported successfully.%n", importedKeys);
+    }
+
+    private static int importUnsorted(JsonParser parser, ColumnFamily columnFamily, String ssTablePath, IPartitioner<?> partitioner) throws IOException
+    {
+        int importedKeys = 0;
+        long start = System.currentTimeMillis();
+        Map<?, ?> data = parser.readValueAs(new TypeReference<Map<?, ?>>() {});
+
+        keyCountToImport = (keyCountToImport == null) ? data.size() : keyCountToImport;
+        SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport);
+
+        System.out.printf("Importing %s keys...%n", keyCountToImport);
+
+        // sort by dk representation, but hold onto the hex version
+        SortedMap<DecoratedKey,String> decoratedKeys = new TreeMap<DecoratedKey,String>();
+
+        for (Object keyObject : data.keySet())
         {
-            JSONObject json = (JSONObject)JSONValue.parseWithException(new FileReader(jsonFile));
-            
-            SSTableWriter writer = new SSTableWriter(ssTablePath, json.size());
-            SortedMap<DecoratedKey,String> decoratedKeys = new TreeMap<DecoratedKey,String>();
-            
-            // sort by dk representation, but hold onto the hex version
-            for (String key : (Set<String>)json.keySet())
-                decoratedKeys.put(partitioner.decorateKey(ByteBuffer.wrap(hexToBytes(key))), key);
+            String key = (String) keyObject;
+            decoratedKeys.put(partitioner.decorateKey(ByteBuffer.wrap(hexToBytes(key))), key);
+        }
 
-            for (Map.Entry<DecoratedKey, String> rowKey : decoratedKeys.entrySet())
+        for (Map.Entry<DecoratedKey, String> rowKey : decoratedKeys.entrySet())
+        {
+            if (columnFamily.getColumnFamilyType() == ColumnFamilyType.Super)
             {
-                if (cfType == ColumnFamilyType.Super)
-                    addToSuperCF((JSONObject)json.get(rowKey.getValue()), cfamily);
-                else
-                    addToStandardCF((JSONArray)json.get(rowKey.getValue()), cfamily);
-                           
-                writer.append(rowKey.getKey(), cfamily);
-                cfamily.clear();
+                addToSuperCF((Map<?, ?>) data.get(rowKey.getValue()), columnFamily);
             }
-            
-            writer.closeAndOpenReader();
+            else
+            {
+                addToStandardCF((List<?>) data.get(rowKey.getValue()), columnFamily);
+            }
+
+            writer.append(rowKey.getKey(), columnFamily);
+            columnFamily.clear();
+
+            importedKeys++;
+
+            long current = System.currentTimeMillis();
+
+            if (current - start >= 5000) // 5 secs.
+            {
+                System.out.printf("Currently imported %d keys.%n", importedKeys);
+                start = current;
+            }
+
+            if (keyCountToImport == importedKeys)
+                break;
         }
-        catch (ClassCastException cce)
+
+        writer.closeAndOpenReader();
+
+        return importedKeys;
+    }
+
+    public static int importSorted(String jsonFile, ColumnFamily columnFamily, String ssTablePath, IPartitioner<?> partitioner) throws IOException
+    {
+        int importedKeys = 0; // already imported keys count
+        long start = System.currentTimeMillis();
+
+        JsonParser parser = getParser(jsonFile);
+
+        if (keyCountToImport == null)
         {
-            throw new RuntimeException("Invalid JSON input, or incorrect column family.", cce);
+            keyCountToImport = 0;
+            System.out.println("Counting keys to import, please wait... (NOTE: to skip this use -n <num_keys>)");
+
+            parser.nextToken(); // START_OBJECT
+            while (parser.nextToken() != null)
+            {
+                parser.nextToken();
+                parser.skipChildren();
+                if (parser.getCurrentName() == null) continue;
+
+                keyCountToImport++;
+            }
+        }
+
+        System.out.printf("Importing %s keys...%n", keyCountToImport);
+
+        parser = getParser(jsonFile); // renewing parser
+        SSTableWriter writer = new SSTableWriter(ssTablePath, keyCountToImport);
+
+        int lineNumber = 1;
+        DecoratedKey prevStoredKey = null;
+
+        while (parser.nextToken() != null)
+        {
+            String key = parser.getCurrentName();
+
+            if (key != null)
+            {
+                String tokenName = parser.nextToken().name();
+
+                if (tokenName.equals("START_ARRAY"))
+                {
+                    if (columnFamily.getColumnFamilyType() == ColumnFamilyType.Super)
+                    {
+                        throw new RuntimeException("Can't write Standard columns to the Super Column Family.");
+                    }
+
+                    List<?> columns = parser.readValueAs(new TypeReference<List<?>>() {});
+                    addToStandardCF(columns, columnFamily);
+                }
+                else if (tokenName.equals("START_OBJECT"))
+                {
+                    if (columnFamily.getColumnFamilyType() == ColumnFamilyType.Standard)
+                    {
+                        throw new RuntimeException("Can't write Super columns to the Standard Column Family.");
+                    }
+
+                    Map<?, ?> columns = parser.readValueAs(new TypeReference<Map<?, ?>>() {});
+                    addToSuperCF(columns, columnFamily);
+                }
+                else
+                {
+                    throw new UnsupportedOperationException("Only Array or Hash allowed as row content.");
+                }
+
+                DecoratedKey currentKey = partitioner.decorateKey(ByteBuffer.wrap(hexToBytes(key)));
+
+                if (prevStoredKey != null && prevStoredKey.compareTo(currentKey) != -1)
+                {
+                    System.err.printf("Line %d: Key %s is greater than previous, collection is not sorted properly. Aborting import. You might need to delete SSTables manually.%n", lineNumber, key);
+                    return -1;
+                }
+
+                // saving decorated key
+                writer.append(currentKey, columnFamily);
+                columnFamily.clear();
+
+                prevStoredKey = currentKey;
+                importedKeys++;
+                lineNumber++;
+
+                long current = System.currentTimeMillis();
+
+                if (current - start >= 5000) // 5 secs.
+                {
+                    System.out.printf("Currently imported %d keys.%n", importedKeys);
+                    start = current;
+                }
+
+                if (keyCountToImport == importedKeys)
+                    break;
+            }
         }
+
+        writer.closeAndOpenReader();
+
+        return importedKeys;
+    }
+
+    /**
+     * Get JsonParser object for file
+     * @param fileName name of the file
+     * @return json parser instance for given file
+     * @throws IOException if any I/O error.
+     */
+    private static JsonParser getParser(String fileName) throws IOException
+    {
+        return factory.createJsonParser(new File(fileName));
     }
 
     /**
@@ -212,33 +370,43 @@ public class SSTableImport
      * @param args command line arguments
      * @throws IOException on failure to open/read/write files or output streams
      * @throws ParseException on failure to parse JSON input
+     * @throws ConfigurationException on configuration error.
      */
     public static void main(String[] args) throws IOException, ParseException, ConfigurationException
     {
-        String usage = String.format("Usage: %s -K keyspace -c column_family <json> <sstable>%n",
-                SSTableImport.class.getName());
-
         CommandLineParser parser = new PosixParser();
+
         try
         {
             cmd = parser.parse(options, args);
-        } catch (org.apache.commons.cli.ParseException e1)
+        }
+        catch (org.apache.commons.cli.ParseException e)
         {
-            System.err.println(e1.getMessage());
-            System.err.println(usage);
+            System.err.println(e.getMessage());
+            printProgramUsage();
             System.exit(1);
         }
 
         if (cmd.getArgs().length != 2)
         {
-            System.err.println(usage);
+            printProgramUsage();
             System.exit(1);
         }
 
-        String json = cmd.getArgs()[0];
-        String ssTable = cmd.getArgs()[1];
+        String json     = cmd.getArgs()[0];
+        String ssTable  = cmd.getArgs()[1];
         String keyspace = cmd.getOptionValue(KEYSPACE_OPTION);
-        String cfamily = cmd.getOptionValue(COLFAM_OPTION);
+        String cfamily  = cmd.getOptionValue(COLUMN_FAMILY_OPTION);
+
+        if (cmd.hasOption(KEY_COUNT_OPTION))
+        {
+            keyCountToImport = Integer.valueOf(cmd.getOptionValue(KEY_COUNT_OPTION));
+        }
+
+        if (cmd.hasOption(IS_SORTED_OPTION))
+        {
+            isSorted = true;
+        }
 
         DatabaseDescriptor.loadSchemas();
         if (DatabaseDescriptor.getNonSystemTables().size() < 1)
@@ -248,9 +416,40 @@ public class SSTableImport
             throw new ConfigurationException(msg);
         }
 
-        importJson(json, keyspace, cfamily, ssTable);
-        
+        try
+        {
+            importJson(json, keyspace, cfamily, ssTable);
+        }
+        catch (Exception e)
+        {
+            e.printStackTrace();
+            System.err.println("ERROR: " + e.getMessage());
+            System.exit(-1);
+        }
+
         System.exit(0);
     }
 
+    private static void printProgramUsage()
+    {
+        System.out.printf("Usage: %s -s -K <keyspace> -c <column_family> -n <num_keys> <json> <sstable>%n%n",
+                            SSTableImport.class.getName());
+
+        System.out.println("Options:");
+        for (Object o :  options.getOptions())
+        {
+            Option opt = (Option) o;
+            System.out.println("  -" +opt.getOpt() + " - " + opt.getDescription());
+        }
+    }
+
+    /**
+     * Used by test framework to set key count
+     * @param keyCount numbers of keys to import
+     */
+    public static void setKeyCountToImport(Integer keyCount)
+    {
+        keyCountToImport = keyCount;
+    }
+
 }

Modified: cassandra/trunk/test/resources/SimpleCF.json
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/resources/SimpleCF.json?rev=1059145&r1=1059144&r2=1059145&view=diff
==============================================================================
--- cassandra/trunk/test/resources/SimpleCF.json (original)
+++ cassandra/trunk/test/resources/SimpleCF.json Fri Jan 14 19:53:00 2011
@@ -1,4 +1,4 @@
 {
- "726f7741": [["636f6c4141", "76616c4141", 1, false], ["636f6c4142", "76616c4142", 1, false], ["636f6c4143", "76616c4143", 1, false, 42, 2000000000 ]],
- "726f7742": [["636f6c4241", "76616c4241", 1, false], ["636f6c4242", "76616c4242", 1, false]]
+ "726f7741": [["636f6c4141", "76616c4141", 1294532915068, false], ["636f6c4142", "76616c4142", 1294532915069, false], ["636f6c4143", "76616c4143", 1294532915071, false, 42, 2000000000 ]],
+ "726f7742": [["636f6c4241", "76616c4241", 1294532915070, false], ["636f6c4242", "76616c4242", 1294532915073, false]]
 }

Modified: cassandra/trunk/test/resources/SuperCF.json
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/resources/SuperCF.json?rev=1059145&r1=1059144&r2=1059145&view=diff
==============================================================================
--- cassandra/trunk/test/resources/SuperCF.json (original)
+++ cassandra/trunk/test/resources/SuperCF.json Fri Jan 14 19:53:00 2011
@@ -1,4 +1,4 @@
 {
-  "726f7741": {"737570657241": {"deletedAt": -9223372036854775808, "subColumns": [["636f6c4141", "76616c75654141", 1, false], ["636f6c4142", "76616c75654142", 1, false]]}},
-  "726f7742": {"737570657242": {"deletedAt": -9223372036854775808, "subColumns": [["636f6c4241", "76616c75654241", 1, false], ["636f6c4242", "76616c75654242", 1, false]]}}
+  "726f7741": {"737570657241": {"deletedAt": -9223372036854775808, "subColumns": [["636f6c4141", "76616c75654141", 1294532915069, false], ["636f6c4142", "76616c75654142", 1294532915069, false]]}},
+  "726f7742": {"737570657242": {"deletedAt": -9223372036854775808, "subColumns": [["636f6c4241", "76616c75654241", 1294532915069, false], ["636f6c4242", "76616c75654242", 1294532915069, false]]}}
 }

Added: cassandra/trunk/test/resources/UnsortedSuperCF.json
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/resources/UnsortedSuperCF.json?rev=1059145&view=auto
==============================================================================
--- cassandra/trunk/test/resources/UnsortedSuperCF.json (added)
+++ cassandra/trunk/test/resources/UnsortedSuperCF.json Fri Jan 14 19:53:00 2011
@@ -0,0 +1,5 @@
+{
+  "303935": { "5330": {"deletedAt": -9223372036854775808, "subColumns": [["4330", "366338333439636337323630", 1294656637116, false], ["4331", "366338333439636337323630", 1294656637116, false], ["4332", "366338333439636337323630", 1294656637116, false], ["4333", "366338333439636337323630", 1294656637116, false], ["4334", "366338333439636337323630", 1294656637116, false]]}}  , 
+  "303630": { "5330": {"deletedAt": -9223372036854775808, "subColumns": [["4330", "643364393434363830326134", 1294656636902, false], ["4331", "643364393434363830326134", 1294656636902, false], ["4332", "643364393434363830326134", 1294656636902, false], ["4333", "643364393434363830326134", 1294656636902, false], ["4334", "643364393434363830326134", 1294656636902, false]]}}  ,
+  "303638": { "5330": {"deletedAt": -9223372036854775808, "subColumns": [["4330", "366634393232663435353638", 1294656636885, false], ["4331", "366634393232663435353638", 1294656636885, false], ["4332", "366634393232663435353638", 1294656636885, false], ["4333", "366634393232663435353638", 1294656636885, false], ["4334", "366634393232663435353638", 1294656636885, false]]}}  
+}

Modified: cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java?rev=1059145&r1=1059144&r2=1059145&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableExportTest.java Fri Jan 14 19:53:00 2011
@@ -65,12 +65,12 @@ public class SSTableExportTest extends S
         SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
         
         // Add rowA
-        cfamily.addColumn(new QueryPath("Standard1", null, ByteBufferUtil.bytes("colA")), ByteBufferUtil.bytes("valA"), 1);
+        cfamily.addColumn(new QueryPath("Standard1", null, ByteBufferUtil.bytes("colA")), ByteBufferUtil.bytes("valA"), System.currentTimeMillis());
         writer.append(Util.dk("rowA"), cfamily);
         cfamily.clear();
         
         // Add rowB
-        cfamily.addColumn(new QueryPath("Standard1", null, ByteBufferUtil.bytes("colB")), ByteBufferUtil.bytes("valB"), 1);
+        cfamily.addColumn(new QueryPath("Standard1", null, ByteBufferUtil.bytes("colB")), ByteBufferUtil.bytes("valB"), System.currentTimeMillis());
         writer.append(Util.dk("rowB"), cfamily);
         cfamily.clear();
      
@@ -99,18 +99,18 @@ public class SSTableExportTest extends S
         
         int nowInSec = (int)(System.currentTimeMillis() / 1000) + 42; //live for 42 seconds
         // Add rowA
-        cfamily.addColumn(new QueryPath("Standard1", null, ByteBufferUtil.bytes("colA")), ByteBufferUtil.bytes("valA"), 1);
-        cfamily.addColumn(null, new ExpiringColumn(ByteBufferUtil.bytes("colExp"), ByteBufferUtil.bytes("valExp"), 1, 42, nowInSec));
+        cfamily.addColumn(new QueryPath("Standard1", null, ByteBufferUtil.bytes("colA")), ByteBufferUtil.bytes("valA"), System.currentTimeMillis());
+        cfamily.addColumn(null, new ExpiringColumn(ByteBufferUtil.bytes("colExp"), ByteBufferUtil.bytes("valExp"), System.currentTimeMillis(), 42, nowInSec));
         writer.append(Util.dk("rowA"), cfamily);
         cfamily.clear();
         
         // Add rowB
-        cfamily.addColumn(new QueryPath("Standard1", null, ByteBufferUtil.bytes("colB")), ByteBufferUtil.bytes("valB"), 1);
+        cfamily.addColumn(new QueryPath("Standard1", null, ByteBufferUtil.bytes("colB")), ByteBufferUtil.bytes("valB"), System.currentTimeMillis());
         writer.append(Util.dk("rowB"), cfamily);
         cfamily.clear();
 
         // Add rowExclude
-        cfamily.addColumn(new QueryPath("Standard1", null, ByteBufferUtil.bytes("colX")), ByteBufferUtil.bytes("valX"), 1);
+        cfamily.addColumn(new QueryPath("Standard1", null, ByteBufferUtil.bytes("colX")), ByteBufferUtil.bytes("valX"), System.currentTimeMillis());
         writer.append(Util.dk("rowExclude"), cfamily);
         cfamily.clear();
 
@@ -146,17 +146,17 @@ public class SSTableExportTest extends S
         SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
         
         // Add rowA
-        cfamily.addColumn(new QueryPath("Super4", ByteBufferUtil.bytes("superA"), ByteBufferUtil.bytes("colA")), ByteBufferUtil.bytes("valA"), 1);
+        cfamily.addColumn(new QueryPath("Super4", ByteBufferUtil.bytes("superA"), ByteBufferUtil.bytes("colA")), ByteBufferUtil.bytes("valA"), System.currentTimeMillis());
         writer.append(Util.dk("rowA"), cfamily);
         cfamily.clear();
         
         // Add rowB
-        cfamily.addColumn(new QueryPath("Super4", ByteBufferUtil.bytes("superB"), ByteBufferUtil.bytes("colB")), ByteBufferUtil.bytes("valB"), 1);
+        cfamily.addColumn(new QueryPath("Super4", ByteBufferUtil.bytes("superB"), ByteBufferUtil.bytes("colB")), ByteBufferUtil.bytes("valB"), System.currentTimeMillis());
         writer.append(Util.dk("rowB"), cfamily);
         cfamily.clear();
 
         // Add rowExclude
-        cfamily.addColumn(new QueryPath("Super4", ByteBufferUtil.bytes("superX"), ByteBufferUtil.bytes("colX")), ByteBufferUtil.bytes("valX"), 1);
+        cfamily.addColumn(new QueryPath("Super4", ByteBufferUtil.bytes("superX"), ByteBufferUtil.bytes("colX")), ByteBufferUtil.bytes("valX"), System.currentTimeMillis());
         writer.append(Util.dk("rowExclude"), cfamily);
         cfamily.clear();
 
@@ -186,12 +186,12 @@ public class SSTableExportTest extends S
         SSTableWriter writer = new SSTableWriter(tempSS.getPath(), 2);
         
         // Add rowA
-        cfamily.addColumn(new QueryPath("Standard1", null, ByteBufferUtil.bytes("name")), ByteBufferUtil.bytes("val"), 1);
+        cfamily.addColumn(new QueryPath("Standard1", null, ByteBufferUtil.bytes("name")), ByteBufferUtil.bytes("val"), System.currentTimeMillis());
         writer.append(Util.dk("rowA"), cfamily);
         cfamily.clear();
 
         // Add rowExclude
-        cfamily.addColumn(new QueryPath("Standard1", null, ByteBufferUtil.bytes("name")), ByteBufferUtil.bytes("val"), 1);
+        cfamily.addColumn(new QueryPath("Standard1", null, ByteBufferUtil.bytes("name")), ByteBufferUtil.bytes("val"), System.currentTimeMillis());
         writer.append(Util.dk("rowExclude"), cfamily);
         cfamily.clear();
 

Modified: cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java?rev=1059145&r1=1059144&r2=1059145&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/tools/SSTableImportTest.java Fri Jan 14 19:53:00 2011
@@ -21,7 +21,6 @@ package org.apache.cassandra.tools;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -32,6 +31,7 @@ import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import static org.apache.cassandra.utils.FBUtilities.hexToBytes;
@@ -44,13 +44,10 @@ import org.json.simple.parser.ParseExcep
 import org.junit.Test;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-
-import org.apache.cassandra.utils.FBUtilities;
-
 public class SSTableImportTest extends SchemaLoader
 {   
     @Test
-    public void testImportSimpleCf() throws IOException, ParseException
+    public void testImportSimpleCf() throws IOException
     {
         // Import JSON to temp SSTable file
         String jsonUrl = getClass().getClassLoader().getResource("SimpleCF.json").getPath();
@@ -68,7 +65,7 @@ public class SSTableImportTest extends S
         IColumn expCol = cf.getColumn(ByteBufferUtil.bytes("colAC"));
         assert expCol.value().equals(ByteBuffer.wrap(hexToBytes("76616c4143")));
         assert expCol instanceof ExpiringColumn;
-        assert ((ExpiringColumn)expCol).getTimeToLive() == 42 && ((ExpiringColumn)expCol).getLocalDeletionTime() == 2000000000;
+        assert ((ExpiringColumn)expCol).getTimeToLive() == 42 && expCol.getLocalDeletionTime() == 2000000000;
     }
 
     @Test
@@ -88,4 +85,18 @@ public class SSTableImportTest extends S
         IColumn subColumn = superCol.getSubColumn(ByteBufferUtil.bytes("colAA"));
         assert subColumn.value().equals(ByteBuffer.wrap(hexToBytes("76616c75654141")));
     }
+
+    @Test
+    public void testImportUnsortedMode() throws IOException
+    {
+        String jsonUrl = getClass().getClassLoader().getResource("UnsortedSuperCF.json").getPath();
+        File tempSS = tempSSTableFile("Keyspace1", "Super4");
+
+        ColumnFamily columnFamily = ColumnFamily.create("Keyspace1", "Super4");
+        IPartitioner<?> partitioner = DatabaseDescriptor.getPartitioner();
+
+        SSTableImport.setKeyCountToImport(3);
+        int result = SSTableImport.importSorted(jsonUrl, columnFamily, tempSS.getPath(), partitioner);
+        assert result == -1;
+    }
 }



Mime
View raw message