cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1152782 - in /cassandra/branches/cassandra-0.8: ./ examples/hadoop_word_count/ examples/hadoop_word_count/bin/ examples/hadoop_word_count/src/ src/java/org/apache/cassandra/hadoop/
Date Mon, 01 Aug 2011 14:05:35 GMT
Author: jbellis
Date: Mon Aug  1 14:05:34 2011
New Revision: 1152782

URL: http://svn.apache.org/viewvc?rev=1152782&view=rev
Log:
add counter support to Hadoop InputFormat
patch by Aaron Morton; reviewed by jbellis for CASSANDRA-2981

Added:
    cassandra/branches/cassandra-0.8/examples/hadoop_word_count/bin/word_count_counters
    cassandra/branches/cassandra-0.8/examples/hadoop_word_count/src/WordCountCounters.java
Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    cassandra/branches/cassandra-0.8/examples/hadoop_word_count/README.txt
    cassandra/branches/cassandra-0.8/examples/hadoop_word_count/src/WordCountSetup.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1152782&r1=1152781&r2=1152782&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Mon Aug  1 14:05:34 2011
@@ -16,6 +16,7 @@
  * Flush memtables on shutdown when durable writes are disabled 
    (CASSANDRA-2958)
  * improved POSIX compatibility of start scripts (CASsANDRA-2965)
+ * add counter support to Hadoop InputFormat (CASSANDRA-2981)
 
 
 0.8.2

Modified: cassandra/branches/cassandra-0.8/examples/hadoop_word_count/README.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/examples/hadoop_word_count/README.txt?rev=1152782&r1=1152781&r2=1152782&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/examples/hadoop_word_count/README.txt (original)
+++ cassandra/branches/cassandra-0.8/examples/hadoop_word_count/README.txt Mon Aug  1 14:05:34
2011
@@ -1,5 +1,6 @@
 WordCount hadoop example: Inserts a bunch of words across multiple rows,
-and counts them, with RandomPartitioner.
+and counts them, with RandomPartitioner. The word_count_counters example sums
+the value of counter columns for a key.
 
 The scripts in bin/ assume you are running with cwd of contrib/word_count.
 
@@ -9,6 +10,7 @@ then run
 contrib/word_count$ ant
 contrib/word_count$ bin/word_count_setup
 contrib/word_count$ bin/word_count
+contrib/word_count$ bin/word_count_counters
 
 In order to view the results in Cassandra, one can use bin/cassandra-cli and
 perform the following operations:
@@ -25,5 +27,8 @@ in the 'wordcount' keyspace.  'cassandra
 
 Read the code in src/ for more details.
 
+The word_count_counters example sums the counter columns for a row. The output
+is written to a text file in /tmp/word_count_counters.
+
 *If you want to point wordcount at a real cluster, modify the seed
 and listenaddress settings accordingly.

Added: cassandra/branches/cassandra-0.8/examples/hadoop_word_count/bin/word_count_counters
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/examples/hadoop_word_count/bin/word_count_counters?rev=1152782&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/examples/hadoop_word_count/bin/word_count_counters (added)
+++ cassandra/branches/cassandra-0.8/examples/hadoop_word_count/bin/word_count_counters Mon
Aug  1 14:05:34 2011
@@ -0,0 +1,58 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cwd=`dirname $0`
+
+# Cassandra class files.
+if [ ! -d $cwd/../../../build/classes/main ]; then
+    echo "Unable to locate cassandra class files" >&2
+    exit 1
+fi
+
+# word_count Jar.
+if [ ! -e $cwd/../build/word_count.jar ]; then
+    echo "Unable to locate word_count jar" >&2
+    exit 1
+fi
+
+CLASSPATH=$CLASSPATH:$cwd/../build/word_count.jar
+CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/main
+CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/thrift
+for jar in $cwd/../build/lib/jars/*.jar; do
+    CLASSPATH=$CLASSPATH:$jar
+done
+for jar in $cwd/../../../lib/*.jar; do
+    CLASSPATH=$CLASSPATH:$jar
+done
+for jar in $cwd/../../../build/lib/jars/*.jar; do
+    CLASSPATH=$CLASSPATH:$jar
+done
+
+if [ -x $JAVA_HOME/bin/java ]; then
+    JAVA=$JAVA_HOME/bin/java
+else
+    JAVA=`which java`
+fi
+
+if [ "x$JAVA" = "x" ]; then
+    echo "Java executable not found (hint: set JAVA_HOME)" >&2
+    exit 1
+fi
+
+#echo $CLASSPATH
+$JAVA -Xmx1G -ea -cp $CLASSPATH WordCountCounters

Added: cassandra/branches/cassandra-0.8/examples/hadoop_word_count/src/WordCountCounters.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/examples/hadoop_word_count/src/WordCountCounters.java?rev=1152782&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.8/examples/hadoop_word_count/src/WordCountCounters.java
(added)
+++ cassandra/branches/cassandra-0.8/examples/hadoop_word_count/src/WordCountCounters.java
Mon Aug  1 14:05:34 2011
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * This sums the word count stored in the input_words_count ColumnFamily for the key "key-if-verse1".
+ *
+ * Output is written to a text file.
+ */
+public class WordCountCounters extends Configured implements Tool
+{
+    private static final Logger logger = LoggerFactory.getLogger(WordCountCounters.class);
+
+    static final String COUNTER_COLUMN_FAMILY = "input_words_count";
+    private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count_counters";
+
+
+    public static void main(String[] args) throws Exception
+    {
+        // Let ToolRunner handle generic command-line options
+        ToolRunner.run(new Configuration(), new WordCountCounters(), args);
+        System.exit(0);
+    }
+
+    public static class SumMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer,
IColumn>, Text, LongWritable>
+    {
+        public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context
context) throws IOException, InterruptedException
+        {
+            long sum = 0;
+            for (IColumn column : columns.values())
+            {
+                logger.debug("read " + key + ":" + column.name() + " from " + context.getInputSplit());
+                sum += ByteBufferUtil.toLong(column.value());
+            }
+            context.write(new Text(ByteBufferUtil.string(key)), new LongWritable(sum));
+        }
+    }
+
+    public int run(String[] args) throws Exception
+    {
+        Job job = new Job(getConf(), "wordcountcounters");
+        job.setJarByClass(WordCountCounters.class);
+        job.setMapperClass(SumMapper.class);
+
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(LongWritable.class);
+        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX));
+
+
+        job.setInputFormatClass(ColumnFamilyInputFormat.class);
+
+
+        ConfigHelper.setRpcPort(job.getConfiguration(), "9160");
+        ConfigHelper.setInitialAddress(job.getConfiguration(), "localhost");
+        ConfigHelper.setPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");
+        ConfigHelper.setInputColumnFamily(job.getConfiguration(), WordCount.KEYSPACE, WordCountCounters.COUNTER_COLUMN_FAMILY);
+        SlicePredicate predicate = new SlicePredicate().setSlice_range(
+                                                                        new SliceRange().
+                                                                        setStart(ByteBufferUtil.EMPTY_BYTE_BUFFER).
+                                                                        setFinish(ByteBufferUtil.EMPTY_BYTE_BUFFER).
+                                                                        setCount(100));
+        ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
+
+        job.waitForCompletion(true);
+        return 0;
+    }
+}

Modified: cassandra/branches/cassandra-0.8/examples/hadoop_word_count/src/WordCountSetup.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/examples/hadoop_word_count/src/WordCountSetup.java?rev=1152782&r1=1152781&r2=1152782&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/examples/hadoop_word_count/src/WordCountSetup.java (original)
+++ cassandra/branches/cassandra-0.8/examples/hadoop_word_count/src/WordCountSetup.java Mon
Aug  1 14:05:34 2011
@@ -81,6 +81,19 @@ public class WordCountSetup
         client.batch_mutate(mutationMap, ConsistencyLevel.ONE);
         logger.info("added text3");
 
+        // sentence data for the counters
+        final ByteBuffer key = ByteBufferUtil.bytes("key-if-verse1");
+        final ColumnParent colParent = new ColumnParent(WordCountCounters.COUNTER_COLUMN_FAMILY);
+        for (String sentence : sentenceData())
+        {
+            client.add(key,
+                       colParent,
+                       new CounterColumn(ByteBufferUtil.bytes(sentence),
+                       (long)sentence.split("\\s").length),
+                       ConsistencyLevel.ONE );
+        }
+        logger.info("added key-if-verse1");
+
         System.exit(0);
     }
 
@@ -115,6 +128,10 @@ public class WordCountSetup
         output.setComparator_type("AsciiType");
         output.setDefault_validation_class("AsciiType");
         cfDefList.add(output);
+        CfDef counterInput = new CfDef(WordCount.KEYSPACE, WordCountCounters.COUNTER_COLUMN_FAMILY);
+        counterInput.setComparator_type("UTF8Type");
+        counterInput.setDefault_validation_class("CounterColumnType");
+        cfDefList.add(counterInput);
 
         KsDef ksDef = new KsDef(WordCount.KEYSPACE, "org.apache.cassandra.locator.SimpleStrategy",
cfDefList);
         ksDef.putToStrategy_options("replication_factor", "1");
@@ -150,4 +167,18 @@ public class WordCountSetup
 
         return new Cassandra.Client(protocol);
     }
+
+    private static String[] sentenceData()
+    {   // Public domain context, source http://en.wikisource.org/wiki/If%E2%80%94
+        return new String[]{
+            "If you can keep your head when all about you",
+            "Are losing theirs and blaming it on you",
+            "If you can trust yourself when all men doubt you,",
+            "But make allowance for their doubting too:",
+            "If you can wait and not be tired by waiting,",
+            "Or being lied about, don’t deal in lies,",
+            "Or being hated, don’t give way to hating,",
+            "And yet don’t look too good, nor talk too wise;"
+        };
+    }
 }

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=1152782&r1=1152781&r2=1152782&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
(original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
Mon Aug  1 14:05:34 2011
@@ -31,11 +31,15 @@ import com.google.common.collect.Abstrac
 
 import org.apache.cassandra.auth.SimpleAuthenticator;
 import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.CounterColumn;
+import org.apache.cassandra.thrift.SuperColumn;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.hadoop.conf.Configuration;
@@ -293,8 +297,13 @@ public class ColumnFamilyRecordReader ex
 
         private IColumn unthriftify(ColumnOrSuperColumn cosc)
         {
-            if (cosc.column == null)
+            if (cosc.counter_column != null)
+                return unthriftifyCounter(cosc.counter_column);
+            if (cosc.counter_super_column != null)
+                return unthriftifySuperCounter(cosc.counter_super_column);
+            if (cosc.super_column != null)
                 return unthriftifySuper(cosc.super_column);
+            assert cosc.column != null;
             return unthriftifySimple(cosc.column);
         }
 
@@ -312,5 +321,20 @@ public class ColumnFamilyRecordReader ex
         {
             return new org.apache.cassandra.db.Column(column.name, column.value, column.timestamp);
         }
+
+        private IColumn unthriftifyCounter(CounterColumn column)
+        {
+            //CounterColumns read the nodeID from the System table, so need the StorageService
running and access
+            //to cassandra.yaml. To avoid a Hadoop needing access to yaml return a regular
Column.
+            return new org.apache.cassandra.db.Column(column.name, ByteBufferUtil.bytes(column.value),
0);
+        }
+
+        private IColumn unthriftifySuperCounter(CounterSuperColumn superColumn)
+        {
+            org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(superColumn.name,
subComparator);
+            for (CounterColumn column : superColumn.columns)
+                sc.addColumn(unthriftifyCounter(column));
+            return sc;
+        }
     }
 }



Mime
View raw message