cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject [2/3] git commit: Predicate pushdown support for CqlStorage Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5790
Date Thu, 25 Jul 2013 15:53:34 GMT
Predicate pushdown support for CqlStorage
Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-5790


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a3dbc3dd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a3dbc3dd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a3dbc3dd

Branch: refs/heads/trunk
Commit: a3dbc3ddc720f9ca4a9075fa542b3a8af54187fc
Parents: 422d223
Author: Brandon Williams <brandonwilliams@apache.org>
Authored: Thu Jul 25 10:52:26 2013 -0500
Committer: Brandon Williams <brandonwilliams@apache.org>
Committed: Thu Jul 25 10:52:26 2013 -0500

----------------------------------------------------------------------
 examples/pig/README.txt                         |  2 +-
 .../hadoop/pig/AbstractCassandraStorage.java    | 48 +++++++++--
 .../cassandra/hadoop/pig/CassandraStorage.java  | 30 -------
 .../apache/cassandra/hadoop/pig/CqlStorage.java | 84 +++++++++++++-------
 4 files changed, 98 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3dbc3dd/examples/pig/README.txt
----------------------------------------------------------------------
diff --git a/examples/pig/README.txt b/examples/pig/README.txt
index 6dc0937..2ae9824 100644
--- a/examples/pig/README.txt
+++ b/examples/pig/README.txt
@@ -100,7 +100,7 @@ CqlStorage
 
 The CqlStorage class is somewhat similar to CassandraStorage, but it can work with CQL3-defined
ColumnFamilies.  The main difference is in the URL format:
 
-cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>][&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>]]
+cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>][&where_clause=<clause>][&split_size=<size>][&use_secondary=true|false][&partitioner=<partitioner>]]
 
 Which in grunt, the simplest example would look like:
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3dbc3dd/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index ff575b2..59d7817 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -18,7 +18,9 @@
 package org.apache.cassandra.hadoop.pig;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.math.BigInteger;
+import java.net.URLDecoder;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.*;
@@ -76,6 +78,8 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements
Store
     protected String DEFAULT_INPUT_FORMAT;
     protected String DEFAULT_OUTPUT_FORMAT;
 
+    public final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter";
+
     protected static final Logger logger = LoggerFactory.getLogger(AbstractCassandraStorage.class);
 
     protected String username;
@@ -90,6 +94,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements
Store
     protected String outputFormatClass;
     protected int splitSize = 64 * 1024;
     protected String partitionerClass;
+    protected boolean usePartitionFilter = false; 
 
     public AbstractCassandraStorage()
     {
@@ -248,15 +253,15 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements
Store
         }
     }
 
-    /** decompose the query to store the parameters in a map*/
-    public static Map<String, String> getQueryMap(String query)
+    /** decompose the query to store the parameters in a map */
+    public static Map<String, String> getQueryMap(String query) throws UnsupportedEncodingException

     {
         String[] params = query.split("&");
         Map<String, String> map = new HashMap<String, String>();
         for (String param : params)
         {
             String[] keyValue = param.split("=");
-            map.put(keyValue[0], keyValue[1]);
+            map.put(keyValue[0], URLDecoder.decode(keyValue[1],"UTF-8"));
         }
         return map;
     }
@@ -674,7 +679,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements
Store
             logger.debug("Found ksDef name: {}", name);
             String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
 
-            logger.debug("partition keys: " + keyString);
+            logger.debug("partition keys: {}", keyString);
             List<String> keyNames = FBUtilities.fromJsonList(keyString);
  
             Iterator<String> iterator = keyNames.iterator();
@@ -687,7 +692,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements
Store
 
             keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
 
-            logger.debug("cluster keys: " + keyString);
+            logger.debug("cluster keys: {}", keyString);
             keyNames = FBUtilities.fromJsonList(keyString);
 
             iterator = keyNames.iterator();
@@ -699,7 +704,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements
Store
             }
 
             String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
-            logger.debug("row key validator: " + validator);
+            logger.debug("row key validator: {}", validator);
             AbstractType<?> keyValidator = parseType(validator);
 
             Iterator<ColumnDef> keyItera = keys.iterator();
@@ -713,7 +718,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements
Store
                 keyItera.next().validation_class = keyValidator.toString();
 
             validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
-            logger.debug("cluster key validator: " + validator);
+            logger.debug("cluster key validator: {}", validator);
 
             if (keyItera.hasNext() && validator != null && !validator.isEmpty())
             {
@@ -735,7 +740,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements
Store
                 try
                 {
                     String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
-                    logger.debug("default validator: " + compactValidator);
+                    logger.debug("default validator: {}", compactValidator);
                     AbstractType<?> defaultValidator = parseType(compactValidator);
 
                     ColumnDef cDef = new ColumnDef();
@@ -766,5 +771,32 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements
Store
         else
             return null;
     }
+
+    /** return partition keys */
+    public String[] getPartitionKeys(String location, Job job)
+    {
+        if (!usePartitionFilter)
+            return null;
+        List<ColumnDef> indexes = getIndexes();
+        String[] partitionKeys = new String[indexes.size()];
+        for (int i = 0; i < indexes.size(); i++)
+        {
+            partitionKeys[i] = new String(indexes.get(i).getName());
+        }
+        return partitionKeys;
+    }
+
+    /** get a list of columns with defined index*/
+    protected List<ColumnDef> getIndexes()
+    {
+        CfDef cfdef = getCfDef(loadSignature);
+        List<ColumnDef> indexes = new ArrayList<ColumnDef>();
+        for (ColumnDef cdef : cfdef.column_metadata)
+        {
+            if (cdef.index_type != null)
+                indexes.add(cdef);
+        }
+        return indexes;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3dbc3dd/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index ed445a2..add4395 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -54,8 +54,6 @@ public class CassandraStorage extends AbstractCassandraStorage
     public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT";
     public final static String PIG_USE_SECONDARY = "PIG_USE_SECONDARY";
 
-    private final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter";
-
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
     private static final Logger logger = LoggerFactory.getLogger(CassandraStorage.class);
 
@@ -68,7 +66,6 @@ public class CassandraStorage extends AbstractCassandraStorage
     private RecordWriter<ByteBuffer, List<Mutation>> writer;
 
     private boolean widerows = false;
-    private boolean usePartitionFilter = false;
     private int limit;
     
     // wide row hacks
@@ -455,20 +452,6 @@ public class CassandraStorage extends AbstractCassandraStorage
         return schema;
     }
 
-    /** return partition keys */
-    public String[] getPartitionKeys(String location, Job job)
-    {
-        if (!usePartitionFilter)
-            return null;
-        List<ColumnDef> indexes = getIndexes();
-        String[] partitionKeys = new String[indexes.size()];
-        for (int i = 0; i < indexes.size(); i++)
-        {
-            partitionKeys[i] = new String(indexes.get(i).getName());
-        }
-        return partitionKeys;
-    }
-
     /** set partition filter */
     public void setPartitionFilter(Expression partitionFilter)
     {
@@ -665,19 +648,6 @@ public class CassandraStorage extends AbstractCassandraStorage
         return indexExpressions;
     }
 
-    /** get a list of columns with defined index*/
-    private List<ColumnDef> getIndexes()
-    {
-        CfDef cfdef = getCfDef(loadSignature);
-        List<ColumnDef> indexes = new ArrayList<ColumnDef>();
-        for (ColumnDef cdef : cfdef.column_metadata)
-        {
-            if (cdef.index_type != null)
-                indexes.add(cdef);
-        }
-        return indexes;
-    }
-
     /** convert a list of index expression to string */
     private static String indexExpressionsToString(List<IndexExpression> indexExpressions)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3dbc3dd/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 004b319..7e22823 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -32,10 +32,12 @@ import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.pig.Expression;
+import org.apache.pig.Expression.OpType;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.*;
+import org.apache.pig.impl.util.UDFContext;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -137,9 +139,18 @@ public class CqlStorage extends AbstractCassandraStorage
 
         CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize));
         if (columns != null && !columns.trim().isEmpty())
-            CqlConfigHelper.setInputColumns(conf, columns);        
-        if (whereClause != null && !whereClause.trim().isEmpty())
-            CqlConfigHelper.setInputWhereClauses(conf, whereClause);
+            CqlConfigHelper.setInputColumns(conf, columns);
+
+        String whereClauseForPartitionFilter = getWhereClauseForPartitionFilter();
+        String wc = whereClause != null && !whereClause.trim().isEmpty() 
+                               ? whereClauseForPartitionFilter == null ? whereClause: String.format("%s
AND %s", whereClause.trim(), whereClauseForPartitionFilter)
+                               : whereClauseForPartitionFilter;
+
+        if (wc != null)
+        {
+            logger.debug("where clause: {}", wc);
+            CqlConfigHelper.setInputWhereClauses(conf, wc);
+        } 
 
         if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
         {
@@ -193,7 +204,7 @@ public class CqlStorage extends AbstractCassandraStorage
         initSchema(storeSignature);
     }
     
-    /** schema: ((name, value), (name, value), (name, value)) where keys are in the front.
*/
+    /** schema: (value, value, value) where keys are in the front. */
     public ResourceSchema getSchema(String location, Job job) throws IOException
     {
         setLocation(location, job);
@@ -209,28 +220,15 @@ public class CqlStorage extends AbstractCassandraStorage
         // will contain all fields for this schema
         List<ResourceFieldSchema> allSchemaFields = new ArrayList<ResourceFieldSchema>();
 
-        // defined validators/indexes
         for (ColumnDef cdef : cfDef.column_metadata)
         {
-            // make a new tuple for each col/val pair
-            ResourceSchema innerTupleSchema = new ResourceSchema();
-            ResourceFieldSchema innerTupleField = new ResourceFieldSchema();
-            innerTupleField.setType(DataType.TUPLE);
-            innerTupleField.setSchema(innerTupleSchema);
-            innerTupleField.setName(new String(cdef.getName()));            
-            ResourceFieldSchema idxColSchema = new ResourceFieldSchema();
-            idxColSchema.setName("name");
-            idxColSchema.setType(getPigType(UTF8Type.instance));
-
             ResourceFieldSchema valSchema = new ResourceFieldSchema();
             AbstractType validator = validators.get(cdef.name);
             if (validator == null)
                 validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
-            valSchema.setName("value");
+            valSchema.setName(new String(cdef.getName()));
             valSchema.setType(getPigType(validator));
-
-            innerTupleSchema.setFields(new ResourceFieldSchema[] { idxColSchema, valSchema
});
-            allSchemaFields.add(innerTupleField);
+            allSchemaFields.add(valSchema);
         }
 
         // top level schema contains everything
@@ -238,16 +236,19 @@ public class CqlStorage extends AbstractCassandraStorage
         return schema;
     }
 
-
-    /** We use CQL3 where clause to define the partition, so do nothing here*/
-    public String[] getPartitionKeys(String location, Job job)
+    public void setPartitionFilter(Expression partitionFilter)
     {
-        return null;
+        UDFContext context = UDFContext.getUDFContext();
+        Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
+        property.setProperty(PARTITION_FILTER_SIGNATURE, partitionFilterToWhereClauseString(partitionFilter));
     }
 
-    /** We use CQL3 where clause to define the partition, so do nothing here*/
-    public void setPartitionFilter(Expression partitionFilter)
+    /** retrieve where clause for partition filter */
+    private String getWhereClauseForPartitionFilter()
     {
+        UDFContext context = UDFContext.getUDFContext();
+        Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
+        return property.getProperty(PARTITION_FILTER_SIGNATURE);
     }
     
     public void prepareToWrite(RecordWriter writer)
@@ -386,7 +387,7 @@ public class CqlStorage extends AbstractCassandraStorage
     
     /** cql://[username:password@]<keyspace>/<columnfamily>[?[page_size=<size>]
      * [&columns=<col1,col2>][&output_query=<prepared_statement_query>][&where_clause=<clause>]
-     * [&split_size=<size>][&partitioner=<partitioner>]] */
+     * [&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]]
*/
     private void setLocationFromUri(String location) throws IOException
     {
         try
@@ -420,6 +421,8 @@ public class CqlStorage extends AbstractCassandraStorage
                     splitSize = Integer.parseInt(urlQuery.get("split_size"));
                 if (urlQuery.containsKey("partitioner"))
                     partitionerClass = urlQuery.get("partitioner");
+                if (urlQuery.containsKey("use_secondary"))
+                    usePartitionFilter = Boolean.parseBoolean(urlQuery.get("use_secondary"));

             }
             String[] parts = urlParts[0].split("/+");
             String[] credentialsAndKeyspace = parts[1].split("@");
@@ -440,7 +443,34 @@ public class CqlStorage extends AbstractCassandraStorage
         {
             throw new IOException("Expected 'cql://[username:password@]<keyspace>/<columnfamily>"
+
             		                         "[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>]"
+
-            		                         "[&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>]]':
" + e.getMessage());
+            		                         "[&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]]':
" + e.getMessage());
+        }
+    }
+
+    /** 
+     * Return cql where clauses for the corresponding partition filter. Make sure the data
format matches 
+     * Only support the following Pig data types: int, long, float, double, boolean and chararray
+     * */
+    private String partitionFilterToWhereClauseString(Expression expression)
+    {
+        Expression.BinaryExpression be = (Expression.BinaryExpression) expression;
+        String name = be.getLhs().toString();
+        String value = be.getRhs().toString();
+        OpType op = expression.getOpType();
+        String opString = op.name();
+        switch (op)
+        {
+            case OP_EQ:
+                opString = " = ";
+            case OP_GE:
+            case OP_GT:
+            case OP_LE:
+            case OP_LT:
+                return String.format("%s %s %s", name, opString, value);
+            case OP_AND:
+                return String.format("%s AND %s", partitionFilterToWhereClauseString(be.getLhs()),
partitionFilterToWhereClauseString(be.getRhs()));
+            default:
+                throw new RuntimeException("Unsupported expression type: " + opString);
         }
     }
 }


Mime
View raw message