cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r984904 - in /cassandra/trunk: ./ contrib/pig/ contrib/pig/bin/ contrib/pig/src/java/org/apache/cassandra/hadoop/pig/ contrib/word_count/ src/java/org/apache/cassandra/client/ src/java/org/apache/cassandra/hadoop/ test/unit/org/apache/cassa...
Date Thu, 12 Aug 2010 18:32:12 GMT
Author: jbellis
Date: Thu Aug 12 18:32:12 2010
New Revision: 984904

URL: http://svn.apache.org/viewvc?rev=984904&view=rev
Log:
Remove references to DatabaseDescriptor from Pig, RingCache, and CFRW, and remove it as a
fallback from CFRR. cassandra.yaml is no longer needed by pig or word_count.
patch by Stu Hood; reviewed by jbellis for CASSANDRA-1322

Removed:
    cassandra/trunk/contrib/pig/cassandra.yaml
    cassandra/trunk/contrib/word_count/cassandra.yaml
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/contrib/pig/README.txt
    cassandra/trunk/contrib/pig/bin/pig_cassandra   (contents, props changed)
    cassandra/trunk/contrib/pig/build.xml
    cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
    cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
    cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Aug 12 18:32:12 2010
@@ -1,4 +1,9 @@
-0.7.0-beta1
+dev
+ * remove cassandra.yaml dependency from Hadoop and Pig (CASSADRA-1322)
+ * expose CfDef metadata in describe_keyspaces (CASSANDRA-1633)
+
+
+0.7-beta1
  * sstable versioning (CASSANDRA-389)
  * switched to slf4j logging (CASSANDRA-625)
  * access levels for authentication/authorization (CASSANDRA-900)

Modified: cassandra/trunk/contrib/pig/README.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/README.txt?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/README.txt (original)
+++ cassandra/trunk/contrib/pig/README.txt Thu Aug 12 18:32:12 2010
@@ -3,11 +3,19 @@ A Pig LoadFunc that reads all columns fr
 Setup:
 
 First build and start a Cassandra server with the default
-configuration* and set the PIG_HOME and JAVA_HOME environment
+configuration and set the PIG_HOME and JAVA_HOME environment
 variables to the location of a Pig >= 0.7.0 install and your Java
-install. If you would like to run using the Hadoop backend, you should
+install. 
+
+If you would like to run using the Hadoop backend, you should
 also set PIG_CONF_DIR to the location of your Hadoop config.
 
+FInally, set the following as environment variables (uppercase,
+underscored), or as Hadoop configuration variables (lowercase, dotted):
+* PIG_RPC_PORT or cassandra.thrift.port : the port thrift is listening on 
+* PIG_INITIAL_ADDRESS or cassandra.thrift.address : initial address to connect to
+* PIG_PARTITIONER or cassandra.partitioner.class : cluster partitioner
+
 Run:
 
 contrib/pig$ ant
@@ -32,6 +40,3 @@ grunt> namecounts = FOREACH namegroups G
 grunt> orderednames = ORDER namecounts BY $0;
 grunt> topnames = LIMIT orderednames 50;
 grunt> dump topnames;
-
-*If you want to point Pig at a real cluster, modify the seed
-address in cassandra.yaml and re-run the build step.

Modified: cassandra/trunk/contrib/pig/bin/pig_cassandra
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/bin/pig_cassandra?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
    (empty)

Propchange: cassandra/trunk/contrib/pig/bin/pig_cassandra
------------------------------------------------------------------------------
    svn:executable = *

Modified: cassandra/trunk/contrib/pig/build.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/build.xml?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/build.xml (original)
+++ cassandra/trunk/contrib/pig/build.xml Thu Aug 12 18:32:12 2010
@@ -30,7 +30,7 @@
     <property name="final.name" value="cassandra_loadfunc" />
 
     <path id="pig.classpath">
-        <fileset file="${env.PIG_HOME}/pig*core.jar" />
+        <fileset file="${env.PIG_HOME}/pig*.jar" />
         <fileset dir="${cassandra.dir}/lib">
             <include name="libthrift*.jar" />
         </fileset>

Modified: cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
(original)
+++ cassandra/trunk/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
Thu Aug 12 18:32:12 2010
@@ -46,6 +46,12 @@ import org.apache.pig.data.TupleFactory;
  */
 public class CassandraStorage extends LoadFunc
 {
+    // system environment variables that can be set to configure connection info:
+    // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper
+    public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
+    public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
+    public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
+
     private final static byte[] BOUND = new byte[0];
     private final static int LIMIT = 1024;
 
@@ -135,6 +141,14 @@ public class CassandraStorage extends Lo
         conf = job.getConfiguration();
         ConfigHelper.setInputSlicePredicate(conf, predicate);
         ConfigHelper.setInputColumnFamily(conf, ksname, cfname);
+
+        // check the environment for connection information
+        if (System.getenv(PIG_RPC_PORT) != null)
+            ConfigHelper.setRpcPort(conf, System.getenv(PIG_RPC_PORT));
+        if (System.getenv(PIG_INITIAL_ADDRESS) != null)
+            ConfigHelper.setInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
+        if (System.getenv(PIG_PARTITIONER) != null)
+            ConfigHelper.setPartitioner(conf, System.getenv(PIG_PARTITIONER));
     }
 
     @Override

Modified: cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/client/RingCache.java Thu Aug 12 18:32:12
2010
@@ -19,8 +19,8 @@ package org.apache.cassandra.client;
 
 import java.util.*;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
@@ -29,7 +29,6 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.Cassandra;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.thrift.TokenRange;
@@ -40,8 +39,8 @@ import org.apache.thrift.protocol.TBinar
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 
-import com.google.common.collect.BiMap;
-import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.HashMultimap;
 
 /**
  *  A class for caching the ring map at the client. For usage example, see
@@ -51,22 +50,21 @@ public class RingCache
 {
     final private static Logger logger_ = LoggerFactory.getLogger(RingCache.class);
 
-    private Set<String> seeds_ = new HashSet<String>();
-    final private int port_= DatabaseDescriptor.getRpcPort();
-    final private static IPartitioner partitioner_ = DatabaseDescriptor.getPartitioner();
+    private final Set<String> seeds_ = new HashSet<String>();
+    private final int port_;
+    private final IPartitioner partitioner_;
     private final String keyspace;
-    private TokenMetadata tokenMetadata;
 
-    public RingCache(String keyspace) throws IOException
+    private Set<Range> rangeSet;
+    private Multimap<Range, InetAddress> rangeMap;
+
+    public RingCache(String keyspace, IPartitioner partitioner, String addresses, int port)
throws IOException
     {
-        for (InetAddress seed : DatabaseDescriptor.getSeeds())
-        {
-            seeds_.add(seed.getHostAddress());
-        }
-        
+        for (String seed : addresses.split(","))
+            seeds_.add(seed);
+        this.port_ = port;
         this.keyspace = keyspace;
-        
-        DatabaseDescriptor.loadSchemas();
+        this.partitioner_ = partitioner;
         refreshEndpointMap();
     }
 
@@ -82,16 +80,17 @@ public class RingCache
                 socket.open();
 
                 List<TokenRange> ring = client.describe_ring(keyspace);
-                BiMap<Token, InetAddress> tokenEndpointMap = HashBiMap.create();
+                rangeMap = HashMultimap.create();
                 
                 for (TokenRange range : ring)
                 {
-                    Token<?> token = StorageService.getPartitioner().getTokenFactory().fromString(range.start_token);
+                    Token<?> left = partitioner_.getTokenFactory().fromString(range.start_token);
+                    Token<?> right = partitioner_.getTokenFactory().fromString(range.end_token);
                     String host = range.endpoints.get(0);
                     
                     try
                     {
-                        tokenEndpointMap.put(token, InetAddress.getByName(host));
+                        rangeMap.put(new Range(left, right, partitioner_), InetAddress.getByName(host));
                     }
                     catch (UnknownHostException e)
                     {
@@ -99,7 +98,7 @@ public class RingCache
                     }
                 }
 
-                tokenMetadata = new TokenMetadata(tokenEndpointMap);
+                rangeSet = new HashSet(rangeMap.keySet());
 
                 break;
             }
@@ -115,11 +114,17 @@ public class RingCache
         }
     }
 
-    public List<InetAddress> getEndpoint(byte[] key)
+    public Collection<InetAddress> getEndpoint(byte[] key)
     {
-        if (tokenMetadata == null)
+        if (rangeSet == null)
             throw new RuntimeException("Must refresh endpoints before looking up a key.");
-        AbstractReplicationStrategy strat = StorageService.createReplicationStrategy(tokenMetadata,
keyspace);
-        return strat.getNaturalEndpoints(partitioner_.getToken(key));
+
+        // TODO: naive linear search of the token map
+        Token<?> t = partitioner_.getToken(key);
+        for (Range range : rangeSet)
+            if (range.contains(t))
+                return rangeMap.get(range);
+
+        throw new RuntimeException("Invalid token information returned by describe_ring:
" + rangeMap);
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java Thu
Aug 12 18:32:12 2010
@@ -32,7 +32,6 @@ import org.apache.cassandra.auth.AllowAl
 import org.apache.cassandra.auth.SimpleAuthenticator;
 
 import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.clock.AbstractReconciler;
 import org.apache.cassandra.db.clock.TimestampReconciler;

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Thu
Aug 12 18:32:12 2010
@@ -33,7 +33,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.cassandra.client.RingCache;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.thrift.Cassandra;
 import org.apache.cassandra.thrift.Clock;
@@ -106,7 +105,10 @@ final class ColumnFamilyRecordWriter ext
     {
         this.context = context;
         this.mutationsByEndpoint = new HashMap<InetAddress,Map<byte[],Map<String,List<Mutation>>>>();
-        this.ringCache = new RingCache(ConfigHelper.getOutputKeyspace(context.getConfiguration()));
+        this.ringCache = new RingCache(ConfigHelper.getOutputKeyspace(context.getConfiguration()),
+                                       ConfigHelper.getPartitioner(context.getConfiguration()),
+                                       ConfigHelper.getInitialAddress(context.getConfiguration()),
+                                       ConfigHelper.getRpcPort(context.getConfiguration()));
         this.batchThreshold = context.getConfiguration().getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD,
Long.MAX_VALUE);
     }
     
@@ -120,10 +122,7 @@ final class ColumnFamilyRecordWriter ext
      */
     protected InetAddress getEndpoint(byte[] key)
     {
-        List<InetAddress> endpoints = ringCache.getEndpoint(key);
-        return endpoints != null && endpoints.size() > 0
-               ? endpoints.get(0)
-               : null;
+        return ringCache.getEndpoint(key).iterator().next();
     }
 
     /**
@@ -327,7 +326,7 @@ final class ColumnFamilyRecordWriter ext
             TSocket socket = null;
             try
             {
-                socket = new TSocket(endpoint.getHostName(), DatabaseDescriptor.getRpcPort());
+                socket = new TSocket(endpoint.getHostName(), ConfigHelper.getRpcPort(taskContext.getConfiguration()));
                 Cassandra.Client client = ColumnFamilyOutputFormat.createAuthenticatedClient(socket,
taskContext);
                 client.batch_mutate(mutations, ConsistencyLevel.ONE);
                 return null;

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ConfigHelper.java Thu Aug 12 18:32:12
2010
@@ -20,8 +20,7 @@ package org.apache.cassandra.hadoop;
  * 
  */
 
-
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.hadoop.conf.Configuration;
@@ -32,6 +31,7 @@ import org.apache.thrift.protocol.TBinar
 
 public class ConfigHelper
 {
+    private static final String PARTITIONER_CONFIG = "cassandra.partitioner.class";
     private static final String INPUT_KEYSPACE_CONFIG = "cassandra.input.keyspace";
     private static final String OUTPUT_KEYSPACE_CONFIG = "cassandra.output.keyspace";
     private static final String INPUT_KEYSPACE_USERNAME_CONFIG = "cassandra.input.keyspace.username";
@@ -95,21 +95,6 @@ public class ConfigHelper
     }
 
     /**
-     * The address and port of a Cassandra node that Hadoop can contact over Thrift
-     * to learn more about the Cassandra cluster.  Optional when storage-conf.xml
-     * is provided.
-     *
-     * @param conf
-     * @param address
-     * @param port
-     */
-    public static void setThriftContact(Configuration conf, String address, int port)
-    {
-        conf.set(THRIFT_PORT, String.valueOf(port));
-        conf.set(INITIAL_THRIFT_ADDRESS, address);
-    }
-
-    /**
      * The number of rows to request with each get range slices request.
      * Too big and you can either get timeouts when it takes Cassandra too
      * long to fetch all the data. Too small and the performance
@@ -244,13 +229,31 @@ public class ConfigHelper
 
     public static int getRpcPort(Configuration conf)
     {
-        String v = conf.get(THRIFT_PORT);
-        return v == null ? DatabaseDescriptor.getRpcPort() : Integer.valueOf(v);
+        return Integer.valueOf(conf.get(THRIFT_PORT));
+    }
+
+    public static void setRpcPort(Configuration conf, String port)
+    {
+        conf.set(THRIFT_PORT, port);
     }
 
     public static String getInitialAddress(Configuration conf)
     {
-        String v = conf.get(INITIAL_THRIFT_ADDRESS);
-        return v == null ? DatabaseDescriptor.getSeeds().iterator().next().getHostAddress()
: v;
+        return conf.get(INITIAL_THRIFT_ADDRESS);
+    }
+
+    public static void setInitialAddress(Configuration conf, String address)
+    {
+        conf.set(INITIAL_THRIFT_ADDRESS, address);
+    }
+
+    public static void setPartitioner(Configuration conf, String classname)
+    {
+        conf.set(PARTITIONER_CONFIG, classname); 
+    }
+
+    public static IPartitioner getPartitioner(Configuration conf)
+    {
+        return FBUtilities.newPartitioner(conf.get(PARTITIONER_CONFIG)); 
     }
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java?rev=984904&r1=984903&r2=984904&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java Thu Aug 12 18:32:12
2010
@@ -19,23 +19,16 @@ package org.apache.cassandra.client;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.util.HashMap;
-import java.util.List;
+import java.util.Collection;
 
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.Column;
-import org.apache.cassandra.thrift.Clock;
-import org.apache.cassandra.thrift.ColumnPath;
-import org.apache.cassandra.thrift.ColumnParent;
-import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.thrift.*;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 
-import org.apache.cassandra.thrift.AuthenticationRequest;
-
 /**
  *  Sample code that uses RingCache in the client.
  */
@@ -46,7 +39,8 @@ public class TestRingCache
 
     public TestRingCache(String keyspace) throws IOException
     {
-    	ringCache = new RingCache(keyspace);
+        String seed = DatabaseDescriptor.getSeeds().iterator().next().getHostAddress();
+    	ringCache = new RingCache(keyspace, DatabaseDescriptor.getPartitioner(), seed, DatabaseDescriptor.getRpcPort());
     }
     
     private void setup(String server, int port) throws Exception
@@ -97,14 +91,13 @@ public class TestRingCache
             ColumnPath col = new ColumnPath("Standard1").setSuper_column(null).setColumn("col1".getBytes());
             ColumnParent parent = new ColumnParent("Standard1").setSuper_column(null);
 
-            List<InetAddress> endpoints = tester.ringCache.getEndpoint(row);
-            String hosts="";
-            for (int i = 0; i < endpoints.size(); i++)
-                hosts = hosts + ((i > 0) ? "," : "") + endpoints.get(i);
-            System.out.println("hosts with key " + new String(row) + " : " + hosts + "; choose
" + endpoints.get(0));
+            Collection<InetAddress> endpoints = tester.ringCache.getEndpoint(row);
+            InetAddress firstEndpoint = endpoints.iterator().next();
+            System.out.printf("hosts with key %s : %s; choose %s%n",
+                              new String(row), StringUtils.join(endpoints, ","), firstEndpoint);
 
             // now, read the row back directly from the host owning the row locally
-            tester.setup(endpoints.get(0).getHostAddress(), DatabaseDescriptor.getRpcPort());
+            tester.setup(firstEndpoint.getHostAddress(), DatabaseDescriptor.getRpcPort());
             tester.thriftClient.set_keyspace(keyspace);
             Clock clock = new Clock();
             clock.setTimestamp(1);



Mime
View raw message