cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject svn commit: r1149341 - /cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
Date Thu, 21 Jul 2011 20:10:55 GMT
Author: brandonwilliams
Date: Thu Jul 21 20:10:54 2011
New Revision: 1149341

URL: http://svn.apache.org/viewvc?rev=1149341&view=rev
Log:
Use a UDF-specific context signature.
Patch by Jeremy Hanna, reviewed by brandonwilliams for CASSANDRA-2869

Modified:
    cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java

Modified: cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1149341&r1=1149340&r2=1149341&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
(original)
+++ cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
Thu Jul 21 20:10:54 2011
@@ -68,8 +68,6 @@ public class CassandraStorage extends Lo
     public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
     public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
 
-    private static String UDFCONTEXT_SCHEMA_KEY_PREFIX = "cassandra.schema";
-
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
     private static final Log logger = LogFactory.getLog(CassandraStorage.class);
 
@@ -78,6 +76,8 @@ public class CassandraStorage extends Lo
     private boolean slice_reverse = false;
     private String keyspace;
     private String column_family;
+    private String loadSignature;
+    private String storeSignature;
 
     private Configuration conf;
     private RecordReader reader;
@@ -112,7 +112,7 @@ public class CassandraStorage extends Lo
             if (!reader.nextKeyValue())
                 return null;
 
-            CfDef cfDef = getCfDef();
+            CfDef cfDef = getCfDef(loadSignature);
             ByteBuffer key = (ByteBuffer)reader.getCurrentKey();
             SortedMap<ByteBuffer,IColumn> cf = (SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
             assert key != null && cf != null;
@@ -165,11 +165,11 @@ public class CassandraStorage extends Lo
         return pair;
     }
 
-    private CfDef getCfDef()
+    private CfDef getCfDef(String signature)
     {
         UDFContext context = UDFContext.getUDFContext();
         Properties property = context.getUDFProperties(CassandraStorage.class);
-        return cfdefFromString(property.getProperty(getSchemaContextKey()));
+        return cfdefFromString(property.getProperty(signature));
     }
 
     private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
@@ -289,7 +289,7 @@ public class CassandraStorage extends Lo
         }
         ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
         setConnectionInformation();
-        initSchema();
+        initSchema(loadSignature);
     }
 
     @Override
@@ -298,9 +298,16 @@ public class CassandraStorage extends Lo
         return location;
     }
 
+    @Override
+    public void setUDFContextSignature(String signature)
+    {
+        this.loadSignature = signature;
+    }
+
     /* StoreFunc methods */
     public void setStoreFuncUDFContextSignature(String signature)
     {
+        this.storeSignature = signature;
     }
 
     public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
@@ -314,7 +321,7 @@ public class CassandraStorage extends Lo
         setLocationFromUri(location);
         ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
         setConnectionInformation();
-        initSchema();
+        initSchema(storeSignature);
     }
 
     public OutputFormat getOutputFormat()
@@ -346,7 +353,7 @@ public class CassandraStorage extends Lo
         ByteBuffer key = objToBB(t.get(0));
         DefaultDataBag pairs = (DefaultDataBag) t.get(1);
         ArrayList<Mutation> mutationList = new ArrayList<Mutation>();
-        CfDef cfDef = getCfDef();
+        CfDef cfDef = getCfDef(storeSignature);
         List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
         try
@@ -404,7 +411,6 @@ public class CassandraStorage extends Lo
                        column.timestamp = System.currentTimeMillis() * 1000;
                        mutation.column_or_supercolumn = new ColumnOrSuperColumn();
                        mutation.column_or_supercolumn.column = column;
-                       mutationList.add(mutation);
                    }
                }
                mutationList.add(mutation);
@@ -412,7 +418,7 @@ public class CassandraStorage extends Lo
         }
         catch (ClassCastException e)
         {
-            throw new IOException(e + " Output must be (key, {(column,value)...}) for ColumnFamily
or (key, {supercolumn:{(column,value)...}...}) for SuperColumnFamily");
+            throw new IOException(e + " Output must be (key, {(column,value)...}) for ColumnFamily
or (key, {supercolumn:{(column,value)...}...}) for SuperColumnFamily", e);
         }
         try
         {
@@ -430,14 +436,13 @@ public class CassandraStorage extends Lo
 
     /* Methods to get the column family schema from Cassandra */
 
-    private void initSchema()
+    private void initSchema(String signature)
     {
         UDFContext context = UDFContext.getUDFContext();
         Properties property = context.getUDFProperties(CassandraStorage.class);
 
-        String schemaContextKey = getSchemaContextKey();
         // Only get the schema if we haven't already gotten it
-        if (!property.containsKey(schemaContextKey))
+        if (!property.containsKey(signature))
         {
             Cassandra.Client client = null;
             try
@@ -455,7 +460,7 @@ public class CassandraStorage extends Lo
                         break;
                     }
                 }
-                property.setProperty(schemaContextKey, cfdefToString(cfDef));
+                property.setProperty(signature, cfdefToString(cfDef));
             }
             catch (TException e)
             {
@@ -521,14 +526,4 @@ public class CassandraStorage extends Lo
         }
         return cfDef;
     }
-
-    private String getSchemaContextKey()
-    {
-        StringBuilder sb = new StringBuilder(UDFCONTEXT_SCHEMA_KEY_PREFIX);
-        sb.append('.');
-        sb.append(keyspace);
-        sb.append('.');
-        sb.append(column_family);
-        return sb.toString();
-    }
 }



Mime
View raw message