cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r989370 - in /cassandra/trunk: src/java/org/apache/cassandra/hadoop/ src/java/org/apache/cassandra/io/ test/unit/org/apache/cassandra/hadoop/
Date Wed, 25 Aug 2010 20:50:45 GMT
Author: jbellis
Date: Wed Aug 25 20:50:45 2010
New Revision: 989370

URL: http://svn.apache.org/viewvc?rev=989370&view=rev
Log:
Use Avro objects as input to CFOutputFormat.  patch by Stu Hood; reviewed by jbellis for CASSANDRA-1315

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java
    cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java?rev=989370&r1=989369&r2=989370&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java Wed
Aug 25 20:50:45 2010
@@ -25,10 +25,10 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.nio.ByteBuffer;
 
 import org.apache.cassandra.auth.SimpleAuthenticator;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.avro.Mutation;
 import org.apache.cassandra.thrift.AuthenticationException;
 import org.apache.cassandra.thrift.AuthenticationRequest;
 import org.apache.cassandra.thrift.AuthorizationException;
@@ -64,11 +64,11 @@ import org.slf4j.LoggerFactory;
  * <p>
  * For the sake of performance, this class employs a lazy write-back caching
  * mechanism, where its record writer batches mutations created based on the
- * reduce's inputs (in a task-specific map). When the writer is closed, then it
- * makes the changes official by sending a batch mutate request to Cassandra.
+ * reduce's inputs (in a task-specific map), and periodically makes the changes
+ * official by sending a batch mutate request to Cassandra.
  * </p>
  */
-public class ColumnFamilyOutputFormat extends OutputFormat<byte[],List<IColumn>>
+public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
 {
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyOutputFormat.class);
     
@@ -93,15 +93,7 @@ public class ColumnFamilyOutputFormat ex
     }
     
     /**
-     * Get the output committer for this output format. This is responsible for
-     * ensuring the output is committed correctly.
-     * 
-     * <p>
-     * This output format employs a lazy write-back caching mechanism, where the
-     * {@link RecordWriter} is responsible for collecting mutations in the
-     * {@link #MUTATIONS_CACHE}, and the {@link OutputCommitter} makes the
-     * changes official by making the change request to Cassandra.
-     * </p>
+     * The OutputCommitter for this format does not write any data to the DFS.
      * 
      * @param context
      *            the task context
@@ -118,19 +110,13 @@ public class ColumnFamilyOutputFormat ex
     /**
      * Get the {@link RecordWriter} for the given task.
      * 
-     * <p>
-     * As stated above, this {@link RecordWriter} merely batches the mutations
-     * that it defines in the {@link #MUTATIONS_CACHE}. In other words, it
-     * doesn't literally cause any changes on the Cassandra server.
-     * </p>
-     * 
      * @param context
      *            the information about the current task.
      * @return a {@link RecordWriter} to write the output for the job.
      * @throws IOException
      */
     @Override
-    public RecordWriter<byte[],List<IColumn>> getRecordWriter(final TaskAttemptContext
context) throws IOException, InterruptedException
+    public RecordWriter<ByteBuffer,List<Mutation>> getRecordWriter(final TaskAttemptContext
context) throws IOException, InterruptedException
     {
         return new ColumnFamilyRecordWriter(context);
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=989370&r1=989369&r2=989370&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Wed
Aug 25 20:50:45 2010
@@ -22,10 +22,12 @@ package org.apache.cassandra.hadoop;
  */
 import java.io.IOException;
 import java.net.InetAddress;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -33,7 +35,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.cassandra.client.RingCache;
-import org.apache.cassandra.db.IColumn;
+import static org.apache.cassandra.io.SerDeUtils.copy;
 import org.apache.cassandra.thrift.Cassandra;
 import org.apache.cassandra.thrift.Clock;
 import org.apache.cassandra.thrift.Column;
@@ -43,6 +45,8 @@ import org.apache.cassandra.thrift.Delet
 import org.apache.cassandra.thrift.Mutation;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.thrift.SuperColumn;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -50,31 +54,28 @@ import org.apache.thrift.transport.TSock
 
 /**
  * The <code>ColumnFamilyRecordWriter</code> maps the output &lt;key, value&gt;
- * pairs to a Cassandra column family. In particular, it creates mutations for
- * each column in the value, which it associates with the key, and in turn the
- * responsible endpoint.
+ * pairs to a Cassandra column family. In particular, it applies all mutations
+ * in the value, which it associates with the key, and in turn the responsible
+ * endpoint.
  * 
  * <p>
  * Note that, given that round trips to the server are fairly expensive, it
- * merely batches the mutations in-memory (specifically in
- * {@link ColumnFamilyOutputFormat#MUTATIONS_CACHE}), and leaves it to the
- * {@link ColumnFamilyOutputCommitter} to send the batched mutations to the
- * server in one shot.
+ * merely batches the mutations in-memory and periodically sends the batched
+ * mutations to the server in one shot.
  * </p>
  * 
  * <p>
  * Furthermore, this writer groups the mutations by the endpoint responsible for
- * the rows being affected. This allows the {@link ColumnFamilyOutputCommitter}
- * to execute the mutations in parallel, on a endpoint-by-endpoint basis.
+ * the rows being affected. This allows the mutations to be executed in parallel,
+ * directly to a responsible endpoint.
  * </p>
  * 
  * @author Karthick Sankarachary
- * @see ColumnFamilyOutputCommitter
  * @see ColumnFamilyOutputFormat
  * @see OutputFormat
  * 
  */
-final class ColumnFamilyRecordWriter extends RecordWriter<byte[],List<IColumn>>
+final class ColumnFamilyRecordWriter extends RecordWriter<ByteBuffer,List<org.apache.cassandra.avro.Mutation>>
 {
     // The task attempt context this writer is associated with.
     private final TaskAttemptContext context;
@@ -87,12 +88,12 @@ final class ColumnFamilyRecordWriter ext
     // the endpoints they should be targeted at. The targeted endpoint
     // essentially
     // acts as the primary replica for the rows being affected by the mutations.
-    private RingCache ringCache;
+    private final RingCache ringCache;
     
     // The number of mutations currently held in the mutations cache.
     private long batchSize = 0L;
     // The maximum number of mutations to hold in the mutations cache.
-    private long batchThreshold = Long.MAX_VALUE;
+    private final long batchThreshold;
     
     /**
      * Upon construction, obtain the map that this writer will use to collect
@@ -144,75 +145,92 @@ final class ColumnFamilyRecordWriter ext
      * @throws IOException
      */
     @Override
-    public synchronized void write(byte[] key, List<IColumn> value) throws IOException,
InterruptedException
+    public synchronized void write(ByteBuffer keybuff, List<org.apache.cassandra.avro.Mutation>
value) throws IOException, InterruptedException
     {
         maybeFlush();
+        byte[] key = copy(keybuff);
         InetAddress endpoint = getEndpoint(key);
         Map<byte[], Map<String, List<Mutation>>> mutationsByKey = mutationsByEndpoint.get(endpoint);
         if (mutationsByKey == null)
         {
-            mutationsByKey = new HashMap<byte[], Map<String, List<Mutation>>>();
+            mutationsByKey = new TreeMap<byte[], Map<String, List<Mutation>>>(FBUtilities.byteArrayComparator);
             mutationsByEndpoint.put(endpoint, mutationsByKey);
         }
 
         Map<String, List<Mutation>> cfMutation = new HashMap<String, List<Mutation>>();
         mutationsByKey.put(key, cfMutation);
 
-        Clock clock = new Clock(System.currentTimeMillis());
         List<Mutation> mutationList = new ArrayList<Mutation>();
         cfMutation.put(ConfigHelper.getOutputColumnFamily(context.getConfiguration()), mutationList);
 
-        if (value == null)
+        for (org.apache.cassandra.avro.Mutation amut : value)
+            mutationList.add(avroToThrift(amut));
+    }
+
+    /**
+     * Deep copies the given Avro mutation into a new Thrift mutation.
+     */
+    private Mutation avroToThrift(org.apache.cassandra.avro.Mutation amut)
+    {
+        Mutation mutation = new Mutation();
+        org.apache.cassandra.avro.ColumnOrSuperColumn acosc = amut.column_or_supercolumn;
+        if (acosc != null)
         {
-            Mutation mutation = new Mutation();
-            Deletion deletion = new Deletion(clock);
-            mutation.setDeletion(deletion);
-            mutationList.add(mutation);
+            // creation
+            ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
+            mutation.setColumn_or_supercolumn(cosc);
+            if (acosc.column != null)
+                // standard column
+                cosc.setColumn(avroToThrift(acosc.column));
+            else
+            {
+                // super column
+                byte[] scolname = copy(acosc.super_column.name);
+                List<Column> scolcols = new ArrayList<Column>((int)acosc.super_column.columns.size());
+                for (org.apache.cassandra.avro.Column acol : acosc.super_column.columns)
+                    scolcols.add(avroToThrift(acol));
+                cosc.setSuper_column(new SuperColumn(scolname, scolcols));
+            }
         }
         else
         {
-            List<byte[]> columnsToDelete = new ArrayList<byte[]>();
-            for (IColumn column : value)
+            // deletion
+            Deletion deletion = new Deletion(avroToThrift(amut.deletion.clock));
+            mutation.setDeletion(deletion);
+            org.apache.cassandra.avro.SlicePredicate apred = amut.deletion.predicate;
+            if (amut.deletion.super_column != null)
+                // super column
+                deletion.setSuper_column(copy(amut.deletion.super_column));
+            else if (apred.column_names != null)
             {
-                Mutation mutation = new Mutation();
-                if (column.value() == null)
-                {
-                    if (columnsToDelete.size() != 1 || columnsToDelete.get(0) != null)
-                    {
-                        if (column.name() == null)
-                            columnsToDelete.clear();
-                        columnsToDelete.add(column.name());
-                    }
-                }
-                else
-                {
-
-                    ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
-                    cosc.setColumn(new Column(column.name(), column.value(), clock));
-                    mutation.setColumn_or_supercolumn(cosc);
-                }
-                mutationList.add(mutation);
+                // column names
+                List<byte[]> colnames = new ArrayList<byte[]>((int)apred.column_names.size());
+                for (ByteBuffer acolname : apred.column_names)
+                    colnames.add(copy(acolname));
+                deletion.setPredicate(new SlicePredicate().setColumn_names(colnames));
             }
-
-            if (columnsToDelete.size() > 0)
+            else
             {
-                Mutation mutation = new Mutation();
-                Deletion deletion = new Deletion(clock);
-
-                if (columnsToDelete.size() != 1 || columnsToDelete.get(0) != null)
-                {
-                    deletion.setPredicate(new SlicePredicate().setColumn_names(columnsToDelete));
-                }
-                else
-                {
-                    SliceRange range = new SliceRange(new byte[]{ }, new byte[]{ }, false,
Integer.MAX_VALUE);
-                    deletion.setPredicate(new SlicePredicate().setSlice_range(range));
-                }
-
-                mutation.setDeletion(deletion);
-                mutationList.add(mutation);
+                // range
+                deletion.setPredicate(new SlicePredicate().setSlice_range(avroToThrift(apred.slice_range)));
             }
         }
+        return mutation;
+    }
+
+    private SliceRange avroToThrift(org.apache.cassandra.avro.SliceRange asr)
+    {
+        return new SliceRange(copy(asr.start), copy(asr.finish), asr.reversed, asr.count);
+    }
+
+    private Column avroToThrift(org.apache.cassandra.avro.Column acol)
+    {
+        return new Column(copy(acol.name), copy(acol.value), avroToThrift(acol.clock));
+    }
+
+    private Clock avroToThrift(org.apache.cassandra.avro.Clock aclo)
+    {
+        return new Clock(aclo.timestamp);
     }
 
     /**

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java?rev=989370&r1=989369&r2=989370&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java Wed Aug 25 20:50:45 2010
@@ -46,6 +46,14 @@ public final class SerDeUtils
     // unbuffered decoders
     private final static DecoderFactory DIRECT_DECODERS = new DecoderFactory().configureDirectDecoder(true);
 
+    public static byte[] copy(ByteBuffer buff)
+    {
+        byte[] bytes = new byte[buff.remaining()];
+        buff.get(bytes);
+        buff.rewind();
+        return bytes;
+    }
+
 	/**
      * Deserializes a single object based on the given Schema.
      * @param writer writer's schema

Modified: cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java?rev=989370&r1=989369&r2=989370&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/hadoop/SampleColumnFamilyOutputTool.java
Wed Aug 25 20:50:45 2010
@@ -68,6 +68,7 @@ public class SampleColumnFamilyOutputToo
         job.setMapOutputValueClass(ColumnWritable.class);
         job.setInputFormatClass(SequenceFileInputFormat.class);
         
+        // TODO: no idea why this test is passing
         job.setReducerClass(ColumnFamilyOutputReducer.class);
         job.setOutputKeyClass(byte[].class);
         job.setOutputValueClass(SortedMap.class);
@@ -76,4 +77,4 @@ public class SampleColumnFamilyOutputToo
         job.waitForCompletion(true);
         return 0;
     }
-}
\ No newline at end of file
+}



Mime
View raw message