cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r992475 - in /cassandra/trunk: CHANGES.txt NEWS.txt interface/cassandra.genavro ivy.xml src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
Date Fri, 03 Sep 2010 21:20:10 GMT
Author: jbellis
Date: Fri Sep  3 21:20:10 2010
New Revision: 992475

URL: http://svn.apache.org/viewvc?rev=992475&view=rev
Log:
support for Hadoop Streaming [non-jvm map/reduce via stdin/out].  patch by Stu Hood; reviewed
by jbellis for CASSANDRA-1368

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/NEWS.txt
    cassandra/trunk/interface/cassandra.genavro
    cassandra/trunk/ivy.xml
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=992475&r1=992474&r2=992475&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Sep  3 21:20:10 2010
@@ -47,6 +47,8 @@ dev
  * change multiget key collection from list to set (CASSANDRA-1329)
  * ability to modify keyspaces and column family definitions on a live cluster
    (CASSANDRA-1285)
+ * support for Hadoop Streaming [non-jvm map/reduce via stdin/out]
+   (CASSANDRA-1368)
 
 
 0.7-beta1

Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=992475&r1=992474&r2=992475&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Fri Sep  3 21:20:10 2010
@@ -15,7 +15,8 @@ Features
     - Optional per-Column time-to-live field allows expiring data without
       have to issue explicit remove commands
     - `truncate` thrift method allows clearing an entire ColumnFamily at once
-    - Hadoop OutputFormat support
+    - Hadoop OutputFormat and Streaming [non-jvm map/reduce via stdin/out]
+      support
     - Up to 8x faster reads from row cache
     - A new ByteOrderedPartitioner supports bytes keys with arbitrary content,
       and orders keys by their byte value.  This should be used in new

Modified: cassandra/trunk/interface/cassandra.genavro
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.genavro?rev=992475&r1=992474&r2=992475&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.genavro (original)
+++ cassandra/trunk/interface/cassandra.genavro Fri Sep  3 21:20:10 2010
@@ -169,6 +169,11 @@ protocol Cassandra {
         array<CfDef> cf_defs;
     }
     
+    record StreamingMutation {
+        bytes key;
+        Mutation mutation;
+    }
+
     record MutationsMapEntry {
         bytes key;
         map<array<Mutation>> mutations;

Modified: cassandra/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/cassandra/trunk/ivy.xml?rev=992475&r1=992474&r2=992475&view=diff
==============================================================================
--- cassandra/trunk/ivy.xml (original)
+++ cassandra/trunk/ivy.xml Fri Sep  3 21:20:10 2010
@@ -28,6 +28,7 @@
     <dependency org="commons-logging" name="commons-logging" rev="1.1.1"/>
     <dependency org="org.apache.rat" name="apache-rat" rev="0.6" />
     <dependency org="com.cloudera.hadoop" name="hadoop-core" rev="0.20.2-320"/>
+    <dependency org="com.cloudera.hadoop" name="hadoop-streaming" rev="0.20.2-320"/>
     <dependency org="net.sf.jopt-simple" name="jopt-simple" rev="3.2"/>
 
     <dependency org="net.java.dev.jna" name="jna" rev="3.2.7"/>

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java?rev=992475&r1=992474&r2=992475&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java Fri
Sep  3 21:20:10 2010
@@ -69,6 +69,7 @@ import org.slf4j.LoggerFactory;
  * </p>
  */
 public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
+    implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>>
 {
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyOutputFormat.class);
     
@@ -85,13 +86,17 @@ public class ColumnFamilyOutputFormat ex
     @Override
     public void checkOutputSpecs(JobContext context)
     {
-        Configuration conf = context.getConfiguration();
+        checkOutputSpecs(context.getConfiguration());
+    }
+
+    private void checkOutputSpecs(Configuration conf)
+    {
         if (ConfigHelper.getOutputKeyspace(conf) == null || ConfigHelper.getOutputColumnFamily(conf)
== null)
         {
             throw new UnsupportedOperationException("you must set the keyspace and columnfamily
with setColumnFamily()");
         }
     }
-    
+
     /**
      * The OutputCommitter for this format does not write any data to the DFS.
      * 
@@ -107,6 +112,20 @@ public class ColumnFamilyOutputFormat ex
         return new NullOutputCommitter();
     }
     
+    /** Fills the deprecated OutputFormat interface for streaming. */
+    @Deprecated @Override
+    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf
job) throws IOException
+    {
+        checkOutputSpecs(job);
+    }
+
+    /** Fills the deprecated OutputFormat interface for streaming. */
+    @Deprecated @Override
+    public ColumnFamilyRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem,
org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress)
throws IOException
+    {
+        return new ColumnFamilyRecordWriter(job);
+    }
+
     /**
      * Get the {@link RecordWriter} for the given task.
      * 
@@ -116,7 +135,7 @@ public class ColumnFamilyOutputFormat ex
      * @throws IOException
      */
     @Override
-    public RecordWriter<ByteBuffer,List<Mutation>> getRecordWriter(final TaskAttemptContext
context) throws IOException, InterruptedException
+    public ColumnFamilyRecordWriter getRecordWriter(final TaskAttemptContext context) throws
IOException, InterruptedException
     {
         return new ColumnFamilyRecordWriter(context);
     }
@@ -126,30 +145,29 @@ public class ColumnFamilyOutputFormat ex
      * keyspace, and is logged in with the configured credentials.
      *
      * @param socket  a socket pointing to a particular node, seed or otherwise
-     * @param context a job context
+     * @param conf a job configuration
      * @return a cassandra client
      * @throws InvalidRequestException
      * @throws TException
      * @throws AuthenticationException
      * @throws AuthorizationException
      */
-    public static Cassandra.Client createAuthenticatedClient(TSocket socket, JobContext context)
+    public static Cassandra.Client createAuthenticatedClient(TSocket socket, Configuration
conf)
     throws InvalidRequestException, TException, AuthenticationException, AuthorizationException
     {
         TBinaryProtocol binaryProtocol = new TBinaryProtocol(new TFramedTransport(socket));
         Cassandra.Client client = new Cassandra.Client(binaryProtocol);
         socket.open();
-        client.set_keyspace(ConfigHelper.getOutputKeyspace(context.getConfiguration()));
-        if (ConfigHelper.getOutputKeyspaceUserName(context.getConfiguration()) != null)
+        client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
+        if (ConfigHelper.getOutputKeyspaceUserName(conf) != null)
         {
             Map<String, String> creds = new HashMap<String, String>();
-            creds.put(SimpleAuthenticator.USERNAME_KEY, ConfigHelper.getOutputKeyspaceUserName(context.getConfiguration()));
-            creds.put(SimpleAuthenticator.PASSWORD_KEY, ConfigHelper.getOutputKeyspacePassword(context.getConfiguration()));
+            creds.put(SimpleAuthenticator.USERNAME_KEY, ConfigHelper.getOutputKeyspaceUserName(conf));
+            creds.put(SimpleAuthenticator.PASSWORD_KEY, ConfigHelper.getOutputKeyspacePassword(conf));
             AuthenticationRequest authRequest = new AuthenticationRequest(creds);
             client.login(authRequest);
         }
         return client;
-
     }
 
     /**

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=992475&r1=992474&r2=992475&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Fri
Sep  3 21:20:10 2010
@@ -47,6 +47,8 @@ import org.apache.cassandra.thrift.Slice
 import org.apache.cassandra.thrift.SliceRange;
 import org.apache.cassandra.thrift.SuperColumn;
 import org.apache.cassandra.utils.FBUtilities;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -75,10 +77,10 @@ import org.apache.thrift.transport.TSock
  * @see OutputFormat
  * 
  */
-final class ColumnFamilyRecordWriter extends RecordWriter<ByteBuffer,List<org.apache.cassandra.avro.Mutation>>
+final class ColumnFamilyRecordWriter extends RecordWriter<ByteBuffer,List<org.apache.cassandra.avro.Mutation>>
implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<org.apache.cassandra.avro.Mutation>>
 {
-    // The task attempt context this writer is associated with.
-    private final TaskAttemptContext context;
+    // The configuration this writer is associated with.
+    private final Configuration conf;
     
     // The batched set of mutations grouped by endpoints.
     private Map<InetAddress,Map<byte[],Map<String,List<Mutation>>>>
mutationsByEndpoint;
@@ -104,15 +106,20 @@ final class ColumnFamilyRecordWriter ext
      */
     ColumnFamilyRecordWriter(TaskAttemptContext context) throws IOException
     {
-        this.context = context;
-        this.mutationsByEndpoint = new HashMap<InetAddress,Map<byte[],Map<String,List<Mutation>>>>();
-        this.ringCache = new RingCache(ConfigHelper.getOutputKeyspace(context.getConfiguration()),
-                                       ConfigHelper.getPartitioner(context.getConfiguration()),
-                                       ConfigHelper.getInitialAddress(context.getConfiguration()),
-                                       ConfigHelper.getRpcPort(context.getConfiguration()));
-        this.batchThreshold = context.getConfiguration().getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD,
Long.MAX_VALUE);
+        this(context.getConfiguration());
     }
     
+    ColumnFamilyRecordWriter(Configuration conf) throws IOException
+    {
+        this.conf = conf;
+        this.mutationsByEndpoint = new HashMap<InetAddress,Map<byte[],Map<String,List<Mutation>>>>();
+        this.ringCache = new RingCache(ConfigHelper.getOutputKeyspace(conf),
+                                       ConfigHelper.getPartitioner(conf),
+                                       ConfigHelper.getInitialAddress(conf),
+                                       ConfigHelper.getRpcPort(conf));
+        this.batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, Long.MAX_VALUE);
+    }
+
     /**
      * Return the endpoint responsible for the given key. The selected endpoint
      * one whose token range contains the given key.
@@ -145,7 +152,7 @@ final class ColumnFamilyRecordWriter ext
      * @throws IOException
      */
     @Override
-    public synchronized void write(ByteBuffer keybuff, List<org.apache.cassandra.avro.Mutation>
value) throws IOException, InterruptedException
+    public synchronized void write(ByteBuffer keybuff, List<org.apache.cassandra.avro.Mutation>
value) throws IOException
     {
         maybeFlush();
         byte[] key = copy(keybuff);
@@ -164,11 +171,11 @@ final class ColumnFamilyRecordWriter ext
             mutationsByKey.put(key, cfMutation);
         }
 
-        List<Mutation> mutationList = cfMutation.get(ConfigHelper.getOutputColumnFamily(context.getConfiguration()));
+        List<Mutation> mutationList = cfMutation.get(ConfigHelper.getOutputColumnFamily(conf));
         if (mutationList == null)
         {
             mutationList = new ArrayList<Mutation>();
-            cfMutation.put(ConfigHelper.getOutputColumnFamily(context.getConfiguration()),
mutationList);
+            cfMutation.put(ConfigHelper.getOutputColumnFamily(conf), mutationList);
         }
 
         for (org.apache.cassandra.avro.Mutation amut : value)
@@ -254,6 +261,13 @@ final class ColumnFamilyRecordWriter ext
         flush();
     }
 
+    /** Fills the deprecated RecordWriter interface for streaming. */
+    @Deprecated @Override
+    public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+    {
+        flush();
+    }
+
     /**
      * Flush the mutations cache, iff more mutations have been cached than
      * {@link #batchThreshold}.
@@ -284,7 +298,7 @@ final class ColumnFamilyRecordWriter ext
             List<Future<?>> mutationFutures = new ArrayList<Future<?>>();
             for (Map.Entry<InetAddress, Map<byte[], Map<String, List<Mutation>>>>
entry : mutationsByEndpoint.entrySet())
             {
-                mutationFutures.add(executor.submit(new EndpointCallable(context, entry.getKey(),
entry.getValue())));
+                mutationFutures.add(executor.submit(new EndpointCallable(conf, entry.getKey(),
entry.getValue())));
             }
             // wait until we have all the results back
             for (Future<?> mutationFuture : mutationFutures)
@@ -321,7 +335,7 @@ final class ColumnFamilyRecordWriter ext
     public class EndpointCallable implements Callable<Void>
     {
         // The task attempt context associated with this callable.
-        private TaskAttemptContext taskContext;
+        private Configuration conf;
         // The endpoint of the primary replica for the rows being mutated
         private InetAddress endpoint;
         // The mutations to be performed in the node referenced by {@link
@@ -332,13 +346,14 @@ final class ColumnFamilyRecordWriter ext
          * Constructs an {@link EndpointCallable} for the given endpoint and set
          * of mutations.
          *
+         * @param conf      job configuration
          * @param endpoint  the endpoint wherein to execute the mutations
          * @param mutations the mutation map expected by
          *                  {@link Cassandra.Client#batch_mutate(Map, ConsistencyLevel)}
          */
-        public EndpointCallable(TaskAttemptContext taskContext, InetAddress endpoint, Map<byte[],
Map<String, List<Mutation>>> mutations)
+        public EndpointCallable(Configuration conf, InetAddress endpoint, Map<byte[],
Map<String, List<Mutation>>> mutations)
         {
-            this.taskContext = taskContext;
+            this.conf = conf;
             this.endpoint = endpoint;
             this.mutations = mutations;
         }
@@ -352,8 +367,8 @@ final class ColumnFamilyRecordWriter ext
             TSocket socket = null;
             try
             {
-                socket = new TSocket(endpoint.getHostName(), ConfigHelper.getRpcPort(taskContext.getConfiguration()));
-                Cassandra.Client client = ColumnFamilyOutputFormat.createAuthenticatedClient(socket,
taskContext);
+                socket = new TSocket(endpoint.getHostName(), ConfigHelper.getRpcPort(conf));
+                Cassandra.Client client = ColumnFamilyOutputFormat.createAuthenticatedClient(socket,
conf);
                 client.batch_mutate(mutations, ConsistencyLevel.ONE);
                 return null;
             }



Mime
View raw message