cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r909622 - in /incubator/cassandra/trunk: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/hadoop/
Date Fri, 12 Feb 2010 21:53:16 GMT
Author: jbellis
Date: Fri Feb 12 21:53:15 2010
New Revision: 909622

URL: http://svn.apache.org/viewvc?rev=909622&view=rev
Log:
add basic hadoop support using Thrift, one split per node
patch by jbellis; reviewed by Stu Hood for CASSANDRA-342

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/
    incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
  (with props)
    incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
  (with props)
    incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
  (with props)
Modified:
    incubator/cassandra/trunk/ivy.xml
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java

Modified: incubator/cassandra/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/ivy.xml?rev=909622&r1=909621&r2=909622&view=diff
==============================================================================
--- incubator/cassandra/trunk/ivy.xml (original)
+++ incubator/cassandra/trunk/ivy.xml Fri Feb 12 21:53:15 2010
@@ -19,6 +19,8 @@
 <ivy-module version="2.0">
   <info organisation="apache-cassandra" module="cassandra"/>
   <dependencies>
+    <dependency org="org.apache.mahout.hadoop"
+                name="hadoop-core" rev="0.20.1"/>
     <!-- FIXME: paranamer and jackson can be dropped after we're depending
     on avro (since it depends on them). -->
     <dependency org="com.thoughtworks.paranamer"

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=909622&r1=909621&r2=909622&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Fri Feb 12
21:53:15 2010
@@ -47,7 +47,7 @@
     private AtomicInteger localDeletionTime = new AtomicInteger(Integer.MIN_VALUE);
 	private AtomicLong markedForDeleteAt = new AtomicLong(Long.MIN_VALUE);
 
-    SuperColumn(byte[] name, AbstractType comparator)
+    public SuperColumn(byte[] name, AbstractType comparator)
     {
         this(name, new ConcurrentSkipListMap<byte[], IColumn>(comparator));
     }

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=909622&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
(added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
Fri Feb 12 21:53:15 2010
@@ -0,0 +1,112 @@
+package org.apache.cassandra.hadoop;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+
+public class ColumnFamilyInputFormat extends InputFormat<String, SortedMap<byte[],
IColumn>>
+{
+    private static final String KEYSPACE_CONFIG = "cassandra.input.keyspace";
+    private static final String COLUMNFAMILY_CONFIG = "cassandra.input.columnfamily";
+
+    private static final Logger logger = Logger.getLogger(StorageService.class);
+
+    private String keyspace;
+    private String columnFamily;
+
+    public static void setColumnFamily(Job job, String keyspace, String columnFamily)
+    {
+        validateNotNullKeyspaceAndColumnFamily(keyspace, columnFamily);
+        try
+        {
+            ThriftValidation.validateColumnFamily(keyspace, columnFamily);
+        }
+        catch (InvalidRequestException e)
+        {
+            throw new RuntimeException(e);
+        }
+        Configuration conf = job.getConfiguration();
+        conf.set(KEYSPACE_CONFIG, keyspace);
+        conf.set(COLUMNFAMILY_CONFIG, columnFamily);
+    }
+
+    private static void validateNotNullKeyspaceAndColumnFamily(String keyspace, String columnFamily)
+    {
+        if (keyspace == null)
+        {
+            throw new RuntimeException("you forgot to set the keyspace with setKeyspace()");
+        }
+        if (columnFamily == null)
+        {
+            throw new RuntimeException("you forgot to set the column family with setColumnFamily()");
+        }
+    }
+
+    public List<InputSplit> getSplits(JobContext context) throws IOException
+    {
+        Configuration conf = context.getConfiguration();
+        keyspace = conf.get(KEYSPACE_CONFIG);
+        columnFamily = conf.get(COLUMNFAMILY_CONFIG);
+        validateNotNullKeyspaceAndColumnFamily(keyspace, columnFamily);
+
+        List<TokenRange> map = getRangeMap();
+        ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
+        for (TokenRange entry : map)
+        {
+            if (logger.isDebugEnabled())
+                logger.debug("split range is [" + entry.start_token + ", " + entry.end_token
+ "]");
+            String[] endpoints = entry.endpoints.toArray(new String[0]);
+            splits.add(new ColumnFamilySplit(keyspace, columnFamily, entry.start_token, entry.end_token,
endpoints));
+        }
+
+        return splits;
+    }
+
+    private List<TokenRange> getRangeMap() throws IOException
+    {
+        TSocket socket = new TSocket(DatabaseDescriptor.getSeeds().iterator().next().getHostAddress(),
+                                     DatabaseDescriptor.getThriftPort());
+        TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false);
+        Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+        try
+        {
+            socket.open();
+        }
+        catch (TTransportException e)
+        {
+            throw new IOException(e);
+        }
+        List<TokenRange> map;
+        try
+        {
+            map = client.describe_ring(keyspace);
+        }
+        catch (TException e)
+        {
+            throw new RuntimeException(e);
+        }
+        return map;
+    }
+
+    @Override
+    public RecordReader<String, SortedMap<byte[], IColumn>> createRecordReader(InputSplit
inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
+    {
+        return new ColumnFamilyRecordReader();
+    }
+}
\ No newline at end of file

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java?rev=909622&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
(added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
Fri Feb 12 21:53:15 2010
@@ -0,0 +1,196 @@
+package org.apache.cassandra.hadoop;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.commons.lang.ArrayUtils;
+
+import com.google.common.collect.AbstractIterator;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.SuperColumn;
+import org.apache.cassandra.utils.Pair;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransportException;
+
+public class ColumnFamilyRecordReader extends RecordReader<String, SortedMap<byte[],
IColumn>>
+{
+    private static final int ROWS_PER_RANGE_QUERY = 1024;
+
+    private ColumnFamilySplit split;
+    private RowIterator iter;
+    private Pair<String, SortedMap<byte[], IColumn>> currentRow;
+
+    public void close() {}
+    
+    public String getCurrentKey()
+    {
+        return currentRow.left;
+    }
+
+    public SortedMap<byte[], IColumn> getCurrentValue()
+    {
+        return currentRow.right;
+    }
+    
+    public float getProgress()
+    {
+        return ((float)iter.rowsRead()) / iter.size();
+    }
+    
+    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException
+    {
+        this.split = (ColumnFamilySplit) split;
+        iter = new RowIterator();
+    }
+    
+    public boolean nextKeyValue() throws IOException
+    {
+        if (!iter.hasNext())
+            return false;
+        currentRow = iter.next();
+        return true;
+    }
+
+    private class RowIterator extends AbstractIterator<Pair<String, SortedMap<byte[],
IColumn>>>
+    {
+
+        private List<KeySlice> rows;
+        private int i = 0;
+        private AbstractType comparator = DatabaseDescriptor.getComparator(split.getTable(),
split.getColumnFamily());
+
+        private void maybeInit()
+        {
+            if (rows != null)
+                return;
+            TSocket socket = new TSocket(getLocation(),
+                                         DatabaseDescriptor.getThriftPort());
+            TBinaryProtocol binaryProtocol = new TBinaryProtocol(socket, false, false);
+            Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+            try
+            {
+                socket.open();
+            }
+            catch (TTransportException e)
+            {
+                throw new RuntimeException(e);
+            }
+            SliceRange sliceRange = new SliceRange(ArrayUtils.EMPTY_BYTE_ARRAY,
+                                                   ArrayUtils.EMPTY_BYTE_ARRAY,
+                                                   false,
+                                                   Integer.MAX_VALUE);
+            KeyRange keyRange = new KeyRange(ROWS_PER_RANGE_QUERY)
+                                .setStart_token(split.getStartToken())
+                                .setEnd_token(split.getEndToken());
+            // TODO "paging" large rows would be good
+            try
+            {
+                rows = client.get_range_slices(split.getTable(),
+                                               new ColumnParent(split.getColumnFamily()),
+                                               new SlicePredicate().setSlice_range(sliceRange),
+                                               keyRange,
+                                               ConsistencyLevel.ONE);
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+
+        // we don't use endpointsnitch since we are trying to support hadoop nodes that are
+        // not necessarily on Cassandra machines, too.  This should be adequate for single-DC
clusters, at least.
+        private String getLocation()
+        {
+            InetAddress[] localAddresses = new InetAddress[0];
+            try
+            {
+                localAddresses = InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress());
+            }
+            catch (UnknownHostException e)
+            {
+                throw new AssertionError(e);
+            }
+            for (InetAddress address : localAddresses)
+            {
+                for (String location : split.getLocations())
+                {
+                    InetAddress locationAddress = null;
+                    try
+                    {
+                        locationAddress = InetAddress.getByName(location);
+                    }
+                    catch (UnknownHostException e)
+                    {
+                        throw new AssertionError(e);
+                    }
+                    if (address.equals(locationAddress))
+                    {
+                        return location;
+                    }
+                }
+            }
+            return split.getLocations()[0];
+        }
+
+        public int size()
+        {
+            maybeInit();
+            return rows.size();
+        }
+
+        public int rowsRead()
+        {
+            return i;
+        }
+
+        @Override
+        protected Pair<String, SortedMap<byte[], IColumn>> computeNext()
+        {
+            maybeInit();
+            if (i == rows.size())
+                return endOfData();
+            KeySlice ks = rows.get(i++);
+            SortedMap<byte[], IColumn> map = new TreeMap<byte[], IColumn>(comparator);
+            for (ColumnOrSuperColumn cosc : ks.columns)
+            {
+                IColumn column = unthriftify(cosc);
+                map.put(column.name(), column);
+            }
+            return new Pair<String, SortedMap<byte[], IColumn>>(ks.key, map);
+        }
+    }
+
+    private IColumn unthriftify(ColumnOrSuperColumn cosc)
+    {
+        if (cosc.column == null)
+            return unthriftifySuper(cosc.super_column);
+        return unthriftifySimple(cosc.column);
+    }
+
+    private IColumn unthriftifySuper(SuperColumn super_column)
+    {
+        AbstractType subComparator = DatabaseDescriptor.getSubComparator(split.getTable(),
split.getColumnFamily());
+        org.apache.cassandra.db.SuperColumn sc = new org.apache.cassandra.db.SuperColumn(super_column.name,
subComparator);
+        for (Column column : super_column.columns)
+        {
+            sc.addColumn(unthriftifySimple(column));
+        }
+        return sc;
+    }
+
+    private IColumn unthriftifySimple(Column column)
+    {
+        return new org.apache.cassandra.db.Column(column.name, column.value, column.timestamp);
+    }
+}
\ No newline at end of file

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java?rev=909622&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
(added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
Fri Feb 12 21:53:15 2010
@@ -0,0 +1,102 @@
+package org.apache.cassandra.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+public class ColumnFamilySplit extends InputSplit implements Writable
+{
+    private String startToken;
+    private String endToken;
+    private String table;
+    private String columnFamily;
+    private String[] dataNodes;
+
+    public ColumnFamilySplit(String table, String columnFamily, String startToken, String
endToken, String[] dataNodes)
+    {
+        assert startToken != null;
+        assert endToken != null;
+        this.startToken = startToken;
+        this.endToken = endToken;
+        this.columnFamily = columnFamily;
+        this.table = table;
+        this.dataNodes = dataNodes;
+    }
+
+    public String getStartToken()
+    {
+        return startToken;
+    }
+
+    public String getEndToken()
+    {
+        return endToken;
+    }
+
+    public String getTable()
+    {
+        return table;
+    }
+
+    public String getColumnFamily()
+    {
+        return columnFamily;
+    }
+
+    // getLength and getLocations satisfy the InputSplit abstraction
+    
+    public long getLength()
+    {
+        // only used for sorting splits. we don't have the capability, yet.
+        return 0;
+    }
+
+    public String[] getLocations()
+    {
+        return dataNodes;
+    }
+
+    // This should only be used by KeyspaceSplit.read();
+    protected ColumnFamilySplit() {}
+
+    // These three methods are for serializing and deserializing
+    // KeyspaceSplits as needed by the Writable interface.
+    public void write(DataOutput out) throws IOException
+    {
+        out.writeUTF(table);
+        out.writeUTF(columnFamily);
+        out.writeUTF(startToken);
+        out.writeUTF(endToken);
+
+        out.writeInt(dataNodes.length);
+        for (String endPoint : dataNodes)
+        {
+            out.writeUTF(endPoint);
+        }
+    }
+    
+    public void readFields(DataInput in) throws IOException
+    {
+        table = in.readUTF();
+        columnFamily = in.readUTF();
+        startToken = in.readUTF();
+        endToken = in.readUTF();
+
+        int numOfEndPoints = in.readInt();
+        dataNodes = new String[numOfEndPoints];
+        for(int i = 0; i < numOfEndPoints; i++)
+        {
+            dataNodes[i] = in.readUTF();
+        }
+    }
+    
+    public static ColumnFamilySplit read(DataInput in) throws IOException
+    {
+        ColumnFamilySplit w = new ColumnFamilySplit();
+        w.readFields(in);
+        return w;
+    }
+}
\ No newline at end of file

Propchange: incubator/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilySplit.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message