cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r919583 - in /incubator/cassandra/branches/cassandra-0.6: contrib/word_count/src/WordCount.java src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
Date Fri, 05 Mar 2010 20:16:52 GMT
Author: jbellis
Date: Fri Mar  5 20:16:52 2010
New Revision: 919583

URL: http://svn.apache.org/viewvc?rev=919583&view=rev
Log:
move configuration static methods into ConfigHelper.  patch by jbellis; reviewed by johano
for CASSANDRA-837

Modified:
    incubator/cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java

Modified: incubator/cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java?rev=919583&r1=919582&r2=919583&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java (original)
+++ incubator/cassandra/branches/cassandra-0.6/contrib/word_count/src/WordCount.java Fri Mar
 5 20:16:52 2010
@@ -25,6 +25,7 @@
 
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
+import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -127,9 +128,9 @@
             job.setInputFormatClass(ColumnFamilyInputFormat.class);
             FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX + i));
 
-            ColumnFamilyInputFormat.setColumnFamily(job, KEYSPACE, COLUMN_FAMILY);
+            ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
             SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
-            ColumnFamilyInputFormat.setSlicePredicate(job, predicate);
+            ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);
 
             job.waitForCompletion(true);
         }

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=919583&r1=919582&r2=919583&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
(original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
Fri Mar  5 20:16:52 2010
@@ -22,38 +22,23 @@
 
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.*;
 
 import org.apache.log4j.Logger;
-import org.apache.commons.lang.ArrayUtils;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.*;
-import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
 import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TJSONProtocol;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
 public class ColumnFamilyInputFormat extends InputFormat<String, SortedMap<byte[],
IColumn>>
 {
-    private static final String KEYSPACE_CONFIG = "cassandra.input.keyspace";
-    private static final String COLUMNFAMILY_CONFIG = "cassandra.input.columnfamily";
-    private static final String PREDICATE_CONFIG = "cassandra.input.predicate";
-    private static final String INPUT_SPLIT_SIZE_CONFIG = "cassandra.input.split.size";
 
     private static final Logger logger = Logger.getLogger(StorageService.class);
 
@@ -61,79 +46,6 @@
     private String columnFamily;
     private SlicePredicate predicate;
 
-    public static void setColumnFamily(Job job, String keyspace, String columnFamily)
-    {
-        if (keyspace == null)
-        {
-            throw new UnsupportedOperationException("keyspace may not be null");
-        }
-        if (columnFamily == null)
-        {
-            throw new UnsupportedOperationException("columnfamily may not be null");
-        }
-        try
-        {
-            ThriftValidation.validateColumnFamily(keyspace, columnFamily);
-        }
-        catch (InvalidRequestException e)
-        {
-            throw new RuntimeException(e);
-        }
-        Configuration conf = job.getConfiguration();
-        conf.set(KEYSPACE_CONFIG, keyspace);
-        conf.set(COLUMNFAMILY_CONFIG, columnFamily);
-    }
-
-    /**
-     * Set the size of the input split.
-     * This affects the number of maps created, if the number is too small
-     * the overhead of each map will take up the bulk of the job time.
-     *  
-     * @param job Job you are about to run.
-     * @param splitsize Size of the input split
-     */
-    public static void setInputSplitSize(Job job, int splitsize)
-    {
-        job.getConfiguration().setInt(INPUT_SPLIT_SIZE_CONFIG, splitsize);
-    }
-    
-    public static void setSlicePredicate(Job job, SlicePredicate predicate)
-    {
-        Configuration conf = job.getConfiguration();
-        conf.set(PREDICATE_CONFIG, predicateToString(predicate));
-    }
-
-    private static String predicateToString(SlicePredicate predicate)
-    {
-        assert predicate != null;
-        // this is so awful it's kind of cool!
-        TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
-        try
-        {
-            return serializer.toString(predicate, "UTF-8");
-        }
-        catch (TException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private SlicePredicate predicateFromString(String st)
-    {
-        assert st != null;
-        TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory());
-        SlicePredicate predicate = new SlicePredicate();
-        try
-        {
-            deserializer.deserialize(predicate, st, "UTF-8");
-        }
-        catch (TException e)
-        {
-            throw new RuntimeException(e);
-        }
-        return predicate;
-    }
-
     private void validateConfiguration()
     {
         if (keyspace == null || columnFamily == null)
@@ -149,15 +61,15 @@
     public List<InputSplit> getSplits(JobContext context) throws IOException
     {
         Configuration conf = context.getConfiguration();
-        keyspace = conf.get(KEYSPACE_CONFIG);
-        columnFamily = conf.get(COLUMNFAMILY_CONFIG);
-        predicate = predicateFromString(conf.get(PREDICATE_CONFIG));
+        predicate = ConfigHelper.getSlicePredicate(conf);
+        keyspace = ConfigHelper.getKeyspace(conf);
+        columnFamily = ConfigHelper.getColumnFamily(conf);
         validateConfiguration();
 
         // cannonical ranges and nodes holding replicas
         List<TokenRange> masterRangeNodes = getRangeMap();
 
-        int splitsize = context.getConfiguration().getInt(INPUT_SPLIT_SIZE_CONFIG, 16384);
+        int splitsize = ConfigHelper.getInputSplitSize(context.getConfiguration());
         
         // cannonical ranges, split into pieces:
         // for each range, pick a live owner and ask it to compute bite-sized splits



Mime
View raw message