cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [1/4] cassandra git commit: Remove deprecated legacy Hadoop code
Date Mon, 08 Jun 2015 19:43:40 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk d62cd1bc9 -> 446e25378


http://git-wip-us.apache.org/repos/asf/cassandra/blob/446e2537/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
index 60d04d3..93ac5be 100644
--- a/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
+++ b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
@@ -22,24 +22,21 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.ColumnOrSuperColumn;
-import org.apache.cassandra.thrift.ColumnPath;
-import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.thrift.NotFoundException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
-import org.apache.thrift.TException;
 
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class ThriftColumnFamilyTest extends PigTestBase
-{    
+{
     private static String[] statements = {
             "DROP KEYSPACE IF EXISTS thrift_ks",
             "CREATE KEYSPACE thrift_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor':
1};",
@@ -171,7 +168,7 @@ public class ThriftColumnFamilyTest extends PigTestBase
     };
 
     @BeforeClass
-    public static void setup() throws IOException, ConfigurationException, TException
+    public static void setup() throws IOException, ConfigurationException
     {
         startCassandra();
         executeCQLStatements(statements);
@@ -300,11 +297,11 @@ public class ThriftColumnFamilyTest extends PigTestBase
     }
 
     @Test
-    public void testCassandraStorageSchema() throws IOException
+    public void testCqlNativeStorageSchema() throws IOException
     {
         //results: (qux,(atomic_weight,0.660161815846869),(created,1335890877),(name,User
Qux),(percent,64.7),
         //(rating,2),(score,12000),(vote_type,dislike))
-        pig.registerQuery("rows = LOAD 'cassandra://thrift_ks/some_app?" + defaultParameters
+ "' USING CassandraStorage();");
+        pig.registerQuery("rows = LOAD 'cql://thrift_ks/some_app?" + defaultParameters +
"' USING CqlNativeStorage();");
 
         //schema: {key: chararray,atomic_weight: (name: chararray,value: double),created:
(name: chararray,value: long),
         //name: (name: chararray,value: chararray),percent: (name: chararray,value: float),
@@ -339,12 +336,13 @@ public class ThriftColumnFamilyTest extends PigTestBase
     }
 
     @Test
-    public void testCassandraStorageFullCopy() throws IOException, TException
+    public void testCqlNativeStorageFullCopy() throws IOException
     {
         pig.setBatchOn();
-        pig.registerQuery("rows = LOAD 'cassandra://thrift_ks/some_app?" + defaultParameters
+ "' USING CassandraStorage();");
+        pig.registerQuery("rows = LOAD 'cql://thrift_ks/some_app?" + defaultParameters +
nativeParameters + "&input_cql=select%20*%20from%20some_app%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
USING CqlNativeStorage();");
+        pig.registerQuery("records = FOREACH rows GENERATE TOTUPLE(TOTUPLE('key', key)),TOTUPLE(atomic_weight,
created, name, percent, rating, score, vote_type);");
         //full copy
-        pig.registerQuery("STORE rows INTO 'cassandra://thrift_ks/copy_of_some_app?" + defaultParameters
+ "' USING CassandraStorage();");
+        pig.registerQuery("STORE records INTO 'cql://thrift_ks/copy_of_some_app?" + defaultParameters
+ nativeParameters + "&output_query=UPDATE+thrift_ks.copy_of_some_app+set+atomic_weight+%3D+%3F,+created+%3D+%3F,+name+%3D+%3F,+percent+%3D+%3F,+rating+%3D+%3F,+score+%3D+%3F,+vote_type+%3D+%3F'
USING CqlNativeStorage();");
         pig.executeBatch();
         Assert.assertEquals("User Qux", getColumnValue("thrift_ks", "copy_of_some_app", "name",
"qux", "UTF8Type"));
         Assert.assertEquals("dislike", getColumnValue("thrift_ks", "copy_of_some_app", "vote_type",
"qux", "UTF8Type"));
@@ -352,158 +350,94 @@ public class ThriftColumnFamilyTest extends PigTestBase
     }
 
     @Test
-    public void testCassandraStorageSingleTupleCopy() throws IOException, TException
+    public void testCqlNativeStorageSingleTupleCopy() throws IOException
     {
         executeCQLStatements(deleteCopyOfSomeAppTableData);
         pig.setBatchOn();
-        pig.registerQuery("rows = LOAD 'cassandra://thrift_ks/some_app?" + defaultParameters
+ "' USING CassandraStorage();");
+        pig.registerQuery("rows = LOAD 'cql://thrift_ks/some_app?" + defaultParameters +
nativeParameters + "&input_cql=select%20*%20from%20some_app%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
USING CqlNativeStorage();");
         //single tuple
-        pig.registerQuery("onecol = FOREACH rows GENERATE key, percent;");
-        pig.registerQuery("STORE onecol INTO 'cassandra://thrift_ks/copy_of_some_app?" +
defaultParameters + "' USING CassandraStorage();");
+        pig.registerQuery("onecol = FOREACH rows GENERATE TOTUPLE(TOTUPLE('key', key)), TOTUPLE(percent);");
+        pig.registerQuery("STORE onecol INTO 'cql://thrift_ks/copy_of_some_app?" + defaultParameters
+ nativeParameters + "&output_query=UPDATE+thrift_ks.copy_of_some_app+set+percent+%3D+%3F'
USING CqlNativeStorage();");
         pig.executeBatch();
-        String value = null;
-        try
-        {
-            value = getColumnValue("thrift_ks", "copy_of_some_app", "name", "qux", "UTF8Type");
-        }
-        catch (NotFoundException e)
-        {
-            Assert.assertTrue(true);
-        }
+        String value = getColumnValue("thrift_ks", "copy_of_some_app", "name", "qux", "UTF8Type");
         if (value != null)
             Assert.fail();
-        try
-        {
-            value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "qux", "UTF8Type");
-        }
-        catch (NotFoundException e)
-        {
-            Assert.assertTrue(true);
-        }
+        value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "qux", "UTF8Type");
         if (value != null)
             Assert.fail();
         Assert.assertEquals("64.7", getColumnValue("thrift_ks", "copy_of_some_app", "percent",
"qux", "FloatType"));
     }
 
     @Test
-    public void testCassandraStorageBagOnlyCopy() throws IOException, TException
+    public void testCqlNativeStorageBagOnlyCopy() throws IOException
     {
         executeCQLStatements(deleteCopyOfSomeAppTableData);
         pig.setBatchOn();
-        pig.registerQuery("rows = LOAD 'cassandra://thrift_ks/some_app?" + defaultParameters
+ "' USING CassandraStorage();");
+        pig.registerQuery("rows = LOAD 'cql://thrift_ks/some_app?" + defaultParameters +
nativeParameters + "&input_cql=select%20*%20from%20some_app%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
USING CqlNativeStorage();");
         //bag only
-        pig.registerQuery("other = FOREACH rows GENERATE key, columns;");
-        pig.registerQuery("STORE other INTO 'cassandra://thrift_ks/copy_of_some_app?" + defaultParameters
+ "' USING CassandraStorage();");
+        pig.registerQuery("other = FOREACH rows GENERATE TOTUPLE(TOTUPLE('key', key)), TOTUPLE();");
+        pig.registerQuery("STORE other INTO 'cql://thrift_ks/copy_of_some_app?" + defaultParameters
+ nativeParameters + "' USING CqlNativeStorage();");
         pig.executeBatch();
-        String value = null;
-        try
-        {
-            value = getColumnValue("thrift_ks", "copy_of_some_app", "name", "qux", "UTF8Type");
-        }
-        catch (NotFoundException e)
-        {
-            Assert.assertTrue(true);
-        }
+        String value = getColumnValue("thrift_ks", "copy_of_some_app", "name", "qux", "UTF8Type");
         if (value != null)
             Assert.fail();
-        try
-        {
-            value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "qux", "UTF8Type");
-        }
-        catch (NotFoundException e)
-        {
-            Assert.assertTrue(true);
-        }
+        value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "qux", "UTF8Type");
         if (value != null)
             Assert.fail();
-        try
-        {
-            value = getColumnValue("thrift_ks", "copy_of_some_app", "percent", "qux", "FloatType");
-        }
-        catch (NotFoundException e)
-        {
-            Assert.assertTrue(true);
-        }
+        value = getColumnValue("thrift_ks", "copy_of_some_app", "percent", "qux", "FloatType");
         if (value != null)
             Assert.fail();
     }
 
     @Test
-    public void testCassandraStorageFilter() throws IOException, TException
+    public void testCqlNativeStorageFilter() throws IOException
     {
         executeCQLStatements(deleteCopyOfSomeAppTableData);
         pig.setBatchOn();
-        pig.registerQuery("rows = LOAD 'cassandra://thrift_ks/some_app?" + defaultParameters
+ "' USING CassandraStorage();");
+        pig.registerQuery("rows = LOAD 'cql://thrift_ks/some_app?" + defaultParameters +
nativeParameters + "&input_cql=select%20*%20from%20some_app%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
USING CqlNativeStorage();");
 
         //filter
-        pig.registerQuery("likes = FILTER rows by vote_type.value eq 'like' and rating.value
> 5;");
-        pig.registerQuery("STORE likes INTO 'cassandra://thrift_ks/copy_of_some_app?" + defaultParameters
+ "' USING CassandraStorage();");
+        pig.registerQuery("likes = FILTER rows by vote_type eq 'like' and rating > 5;");
+        pig.registerQuery("records = FOREACH likes GENERATE TOTUPLE(TOTUPLE('key', key)),TOTUPLE(atomic_weight,
created, name, percent, rating, score, vote_type);");
+        pig.registerQuery("STORE records INTO 'cql://thrift_ks/copy_of_some_app?" + defaultParameters
+ nativeParameters + "&output_query=UPDATE+thrift_ks.copy_of_some_app+set+atomic_weight+%3D+%3F,+created+%3D+%3F,+name+%3D+%3F,+percent+%3D+%3F,+rating+%3D+%3F,+score+%3D+%3F,+vote_type+%3D+%3F'
USING CqlNativeStorage();");
         pig.executeBatch();
 
         Assert.assertEquals("like", getColumnValue("thrift_ks", "copy_of_some_app", "vote_type",
"bar", "UTF8Type"));
         Assert.assertEquals("like", getColumnValue("thrift_ks", "copy_of_some_app", "vote_type",
"foo", "UTF8Type"));
-        String value = null;
-        try
-        {
-            value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "qux", "UTF8Type");
-        }
-        catch (NotFoundException e)
-        {
-            Assert.assertTrue(true);
-        }
+        String value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "qux",
"UTF8Type");
         if (value != null)
             Assert.fail();
-        try
-        {
-            value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "baz", "UTF8Type");
-        }
-        catch (NotFoundException e)
-        {
-            Assert.assertTrue(true);
-        }
+        value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "baz", "UTF8Type");
+
         if (value != null)
             Assert.fail();
 
         executeCQLStatements(deleteCopyOfSomeAppTableData);
         pig.setBatchOn();
-        pig.registerQuery("rows = LOAD 'cassandra://thrift_ks/some_app?" + defaultParameters
+ "' USING CassandraStorage();");
-        pig.registerQuery("dislikes_extras = FILTER rows by vote_type.value eq 'dislike';");
-        pig.registerQuery("STORE dislikes_extras INTO 'cassandra://thrift_ks/copy_of_some_app?"
+ defaultParameters + "' USING CassandraStorage();");
+        pig.registerQuery("rows = LOAD 'cql://thrift_ks/some_app?" + defaultParameters +
nativeParameters + "&input_cql=select%20*%20from%20some_app%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
USING CqlNativeStorage();");
+        pig.registerQuery("dislikes_extras = FILTER rows by vote_type eq 'dislike';");
+        pig.registerQuery("dislikes_records = FOREACH dislikes_extras GENERATE TOTUPLE(TOTUPLE('key',
key)),TOTUPLE(atomic_weight, created, name, percent, rating, score, vote_type);");
+        pig.registerQuery("STORE dislikes_records INTO 'cql://thrift_ks/copy_of_some_app?"
+ defaultParameters + nativeParameters + "&output_query=UPDATE+thrift_ks.copy_of_some_app+set+atomic_weight+%3D+%3F,+created+%3D+%3F,+name+%3D+%3F,+percent+%3D+%3F,+rating+%3D+%3F,+score+%3D+%3F,+vote_type+%3D+%3F'
USING CqlNativeStorage();");
         pig.executeBatch();
         Assert.assertEquals("dislike", getColumnValue("thrift_ks", "copy_of_some_app", "vote_type",
"baz", "UTF8Type"));
         Assert.assertEquals("dislike", getColumnValue("thrift_ks", "copy_of_some_app", "vote_type",
"qux", "UTF8Type"));
-        value = null;
-        try
-        {
-            value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "bar", "UTF8Type");
-        }
-        catch (NotFoundException e)
-        {
-            Assert.assertTrue(true);
-        }
+        value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "bar", "UTF8Type");
         if (value != null)
             Assert.fail();
-        try
-        {
-            value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "foo", "UTF8Type");
-        }
-        catch (NotFoundException e)
-        {
-            Assert.assertTrue(true);
-        }
+        value = getColumnValue("thrift_ks", "copy_of_some_app", "vote_type", "foo", "UTF8Type");
         if (value != null)
             Assert.fail();
     }
 
     @Test
-    public void testCassandraStorageJoin() throws IOException
+    public void testCqlNativeStorageJoin() throws IOException
     {
         //test key types with a join
-        pig.registerQuery("U8 = load 'cassandra://thrift_ks/u8?" + defaultParameters + "'
using CassandraStorage();");
-        pig.registerQuery("Bytes = load 'cassandra://thrift_ks/bytes?" + defaultParameters
+ "' using CassandraStorage();");
+        pig.registerQuery("U8 = load 'cql://thrift_ks/u8?" + defaultParameters + nativeParameters
+ "&input_cql=select%20*%20from%20u8%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
using CqlNativeStorage();");
+        pig.registerQuery("Bytes = load 'cql://thrift_ks/bytes?" + defaultParameters + nativeParameters
+ "&input_cql=select%20*%20from%20bytes%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
using CqlNativeStorage();");
 
         //cast key to chararray
-        pig.registerQuery("b = foreach Bytes generate (chararray)key, columns;");
+        pig.registerQuery("b = foreach Bytes generate (chararray)key, column1, value;");
 
         //key in Bytes is a bytearray, U8 chararray
         //(foo,{(x,Z)},foo,{(x,Z)})
@@ -512,18 +446,11 @@ public class ThriftColumnFamilyTest extends PigTestBase
         if (it.hasNext()) {
             Tuple t = it.next();
             Assert.assertEquals(t.get(0), new DataByteArray("foo".getBytes()));
-            DataBag columns = (DataBag) t.get(1);
-            Iterator<Tuple> iter = columns.iterator();
-            Tuple t1 = iter.next();
-            Assert.assertEquals(t1.get(0), "x");
-            Assert.assertEquals(t1.get(1), new DataByteArray("Z".getBytes()));
-            String column = (String) t.get(2);
-            Assert.assertEquals(column, "foo");
-            columns = (DataBag) t.get(3);
-            iter = columns.iterator();
-            Tuple t2 = iter.next();
-            Assert.assertEquals(t2.get(0), "x");
-            Assert.assertEquals(t2.get(1), new DataByteArray("Z".getBytes()));
+            Assert.assertEquals(t.get(1), "x");
+            Assert.assertEquals(t.get(2), new DataByteArray("Z".getBytes()));
+            Assert.assertEquals(t.get(3), "foo");
+            Assert.assertEquals(t.get(4), "x");
+            Assert.assertEquals(t.get(5), new DataByteArray("Z".getBytes()));
         }
         //key should now be cast into a chararray
         //(foo,{(x,Z)},foo,{(x,Z)})
@@ -532,27 +459,22 @@ public class ThriftColumnFamilyTest extends PigTestBase
         if (it.hasNext()) {
             Tuple t = it.next();
             Assert.assertEquals(t.get(0), "foo");
-            DataBag columns = (DataBag) t.get(1);
-            Iterator<Tuple> iter = columns.iterator();
-            Tuple t1 = iter.next();
-            Assert.assertEquals(t1.get(0), "x");
-            Assert.assertEquals(t1.get(1), new DataByteArray("Z".getBytes()));
-            String column = (String) t.get(2);
-            Assert.assertEquals(column, "foo");
-            columns = (DataBag) t.get(3);
-            iter = columns.iterator();
-            Tuple t2 = iter.next();
-            Assert.assertEquals(t2.get(0), "x");
-            Assert.assertEquals(t2.get(1), new DataByteArray("Z".getBytes()));
+            Assert.assertEquals(t.get(1), "x");
+            Assert.assertEquals(t.get(2), new DataByteArray("Z".getBytes()));
+            Assert.assertEquals(t.get(3), "foo");
+            Assert.assertEquals(t.get(4), "x");
+            Assert.assertEquals(t.get(5), new DataByteArray("Z".getBytes()));
         }
     }
 
     @Test
-    public void testCassandraStorageCounterCF() throws IOException
+    public void testCqlNativeStorageCounterCF() throws IOException
     {
         //Test counter column family support
-        pig.registerQuery("CC = load 'cassandra://thrift_ks/cc?" + defaultParameters + "'
using CassandraStorage();");
-        pig.registerQuery("total_hits = foreach CC generate key, SUM(columns.value);");
+        pig.registerQuery("CC = load 'cql://thrift_ks/cc?" + defaultParameters + nativeParameters
+ "&input_cql=select%20*%20from%20cc%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
using CqlNativeStorage();");
+        pig.registerQuery("A = foreach CC generate key, name, value;");
+        pig.registerQuery("B = GROUP A BY key;");
+        pig.registerQuery("total_hits = foreach B generate group, SUM(A.value);");
         //(chuck,4)
         Tuple t = pig.openIterator("total_hits").next();
         Assert.assertEquals(t.get(0), "chuck");
@@ -560,12 +482,11 @@ public class ThriftColumnFamilyTest extends PigTestBase
     }
 
     @Test
-    public void testCassandraStorageCompositeColumnCF() throws IOException
+    public void testCqlNativeStorageCompositeColumnCF() throws IOException
     {
         //Test CompositeType
-        pig.registerQuery("compo = load 'cassandra://thrift_ks/compo?" + defaultParameters
+ "' using CassandraStorage();");
-        pig.registerQuery("compo = foreach compo generate key as method, flatten(columns);");
-        pig.registerQuery("lee = filter compo by columns::name == ('bruce','lee');");
+        pig.registerQuery("compo = load 'cql://thrift_ks/compo?" + defaultParameters + nativeParameters
+ "&input_cql=select%20*%20from%20compo%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
using CqlNativeStorage();");
+        pig.registerQuery("lee = filter compo by column1 == 'bruce' AND column2 == 'lee';");
 
         //(kick,(bruce,lee),oww)
         //(punch,(bruce,lee),ouch)
@@ -574,18 +495,16 @@ public class ThriftColumnFamilyTest extends PigTestBase
         while (it.hasNext()) {
             count ++;
             Tuple t = it.next();
-            Tuple t1 = (Tuple) t.get(1);
-            Assert.assertEquals(t1.get(0), "bruce");
-            Assert.assertEquals(t1.get(1), "lee");
+            Assert.assertEquals(t.get(1), "bruce");
+            Assert.assertEquals(t.get(2), "lee");
             if ("kick".equals(t.get(0)))
-                Assert.assertEquals(t.get(2), "oww");
-            else if ("kick".equals(t.get(0)))
-                Assert.assertEquals(t.get(2), "ouch");
+                Assert.assertEquals(t.get(3), "oww");
+            else
+                Assert.assertEquals(t.get(3), "ouch");
         }
         Assert.assertEquals(count, 2);
-        pig.registerQuery("night = load 'cassandra://thrift_ks/compo_int?" + defaultParameters
+ "' using CassandraStorage();");
-        pig.registerQuery("night = foreach night generate flatten(columns);");
-        pig.registerQuery("night = foreach night generate (int)columns::name.$0+(double)columns::name.$1/60
as hour, columns::value as noise;");
+        pig.registerQuery("night = load 'cql://thrift_ks/compo_int?" + defaultParameters
+ nativeParameters + "&input_cql=select%20*%20from%20compo_int%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
using CqlNativeStorage();");
+        pig.registerQuery("night = foreach night generate (int)column1+(double)column2/60
as hour, value as noise;");
 
         //What happens at the darkest hour?
         pig.registerQuery("darkest = filter night by hour > 2 and hour < 5;");
@@ -598,10 +517,10 @@ public class ThriftColumnFamilyTest extends PigTestBase
             Assert.assertEquals(t.get(1), "daddy?");
         }
         pig.setBatchOn();
-        pig.registerQuery("compo_int_rows = LOAD 'cassandra://thrift_ks/compo_int?" + defaultParameters
+ "' using CassandraStorage();");
-        pig.registerQuery("STORE compo_int_rows INTO 'cassandra://thrift_ks/compo_int_copy?"
+ defaultParameters + "' using CassandraStorage();");
+        pig.registerQuery("compo_int_rows = LOAD 'cql://thrift_ks/compo_int?" + defaultParameters
+ nativeParameters + "&input_cql=select%20*%20from%20compo_int%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
using CqlNativeStorage();");
+        pig.registerQuery("STORE compo_int_rows INTO 'cql://thrift_ks/compo_int_copy?" +
defaultParameters + nativeParameters + "&output_query=UPDATE+thrift_ks.compo_int_copy+set+column1+%3D+%3F,+column2+%3D+%3F,+value+%3D+%3F'
using CqlNativeStorage();");
         pig.executeBatch();
-        pig.registerQuery("compocopy_int_rows = LOAD 'cassandra://thrift_ks/compo_int_copy?"
+ defaultParameters + "' using CassandraStorage();");
+        pig.registerQuery("compocopy_int_rows = LOAD 'cql://thrift_ks/compo_int_copy?" +
defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20compo_int_copy%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
using CqlNativeStorage();");
         //(clock,{((1,0),z),((1,30),zzzz),((2,30),daddy?),((6,30),coffee...)})
         it = pig.openIterator("compocopy_int_rows");
         count = 0;
@@ -627,32 +546,26 @@ public class ThriftColumnFamilyTest extends PigTestBase
     }
 
     @Test
-    public void testCassandraStorageCompositeKeyCF() throws IOException
+    public void testCqlNativeStorageCompositeKeyCF() throws IOException
     {
         //Test CompositeKey
-        pig.registerQuery("compokeys = load 'cassandra://thrift_ks/compo_key?" + defaultParameters
+ "' using CassandraStorage();");
-        pig.registerQuery("compokeys = filter compokeys by key.$1 == 40;");
-        //((clock,40),{(6,coffee...)})
+        pig.registerQuery("compokeys = load 'cql://thrift_ks/compo_key?" + defaultParameters
+ nativeParameters + "&input_cql=select%20*%20from%20compo_key%20where%20token(key,column1)%20%3E%20%3F%20and%20token(key,column1)%20%3C%3D%20%3F'
using CqlNativeStorage();");
+        pig.registerQuery("compokeys = filter compokeys by column1 == 40;");
+        //(clock,40,6,coffee...)
         Iterator<Tuple> it = pig.openIterator("compokeys");
         if (it.hasNext()) {
             Tuple t = it.next();
-            Tuple key = (Tuple) t.get(0); 
-            Assert.assertEquals(key.get(0), "clock");
-            Assert.assertEquals(key.get(1), 40L);
-            DataBag columns = (DataBag) t.get(1);
-            Iterator<Tuple> iter = columns.iterator();
-            if (iter.hasNext())
-            {
-                Tuple t1 = iter.next();
-                Assert.assertEquals(t1.get(0), 6L);
-                Assert.assertEquals(t1.get(1), "coffee...");
-            }
+            Assert.assertEquals(t.get(0), "clock");
+            Assert.assertEquals(t.get(1), 40L);
+            Assert.assertEquals(t.get(2), 6L);
+            Assert.assertEquals(t.get(3), "coffee...");
         }
         pig.setBatchOn();
-        pig.registerQuery("compo_key_rows = LOAD 'cassandra://thrift_ks/compo_key?" + defaultParameters
+ "' using CassandraStorage();");
-        pig.registerQuery("STORE compo_key_rows INTO 'cassandra://thrift_ks/compo_key_copy?"
+ defaultParameters + "' using CassandraStorage();");
+        pig.registerQuery("compo_key_rows = LOAD 'cql://thrift_ks/compo_key?" + defaultParameters
+ nativeParameters + "&input_cql=select%20*%20from%20compo_key%20where%20token(key,column1)%20%3E%20%3F%20and%20token(key,column1)%20%3C%3D%20%3F'
using CqlNativeStorage();");
+        pig.registerQuery("compo_key_rows = FOREACH compo_key_rows GENERATE TOTUPLE(TOTUPLE('key',key),TOTUPLE('column1',column1),TOTUPLE('column2',column2)),TOTUPLE(value);");
+        pig.registerQuery("STORE compo_key_rows INTO 'cql://thrift_ks/compo_key_copy?" +
defaultParameters + nativeParameters + "&output_query=UPDATE+thrift_ks.compo_key_copy+set+value+%3D+%3F'
using CqlNativeStorage();");
         pig.executeBatch();
-        pig.registerQuery("compo_key_copy_rows = LOAD 'cassandra://thrift_ks/compo_key_copy?"
+ defaultParameters + "' using CassandraStorage();");
+        pig.registerQuery("compo_key_copy_rows = LOAD 'cql://thrift_ks/compo_key_copy?" +
defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20compo_key_copy%20where%20token(key,column1)%20%3E%20%3F%20and%20token(key,column1)%20%3C%3D%20%3F'
using CqlNativeStorage();");
         //((clock,10),{(1,z)})
         //((clock,20),{(1,zzzz)})
         //((clock,30),{(2,daddy?)})
@@ -662,66 +575,43 @@ public class ThriftColumnFamilyTest extends PigTestBase
         while (it.hasNext()) {
             Tuple t = it.next();
             count ++;
-            Tuple key = (Tuple) t.get(0); 
-            if ("clock".equals(key.get(0)) && (Long) key.get(1) == 10L)
+            if ("clock".equals(t.get(0)) && (Long) t.get(1) == 10L)
             {
-                DataBag columns = (DataBag) t.get(1);
-                Iterator<Tuple> iter = columns.iterator();
-                if (iter.hasNext())
-                {
-                    Tuple t1 = iter.next();
-                    Assert.assertEquals(t1.get(0), 1L);
-                    Assert.assertEquals(t1.get(1), "z");
-                }
+                Assert.assertEquals(t.get(2), 1L);
+                Assert.assertEquals(t.get(3), "z");
             }
-            else if ("clock".equals(key.get(0)) && (Long) key.get(1) == 40L)
+            else if ("clock".equals(t.get(0)) && (Long) t.get(1) == 40L)
             {
-                DataBag columns = (DataBag) t.get(1);
-                Iterator<Tuple> iter = columns.iterator();
-                if (iter.hasNext())
-                {
-                    Tuple t1 = iter.next();
-                    Assert.assertEquals(t1.get(0), 6L);
-                    Assert.assertEquals(t1.get(1), "coffee...");
-                }
+                Assert.assertEquals(t.get(2), 6L);
+                Assert.assertEquals(t.get(3), "coffee...");
             }
-            else if ("clock".equals(key.get(0)) && (Long) key.get(1) == 20L)
+            else if ("clock".equals(t.get(0)) && (Long) t.get(1) == 20L)
             {
-                DataBag columns = (DataBag) t.get(1);
-                Iterator<Tuple> iter = columns.iterator();
-                if (iter.hasNext())
-                {
-                    Tuple t1 = iter.next();
-                    Assert.assertEquals(t1.get(0), 1L);
-                    Assert.assertEquals(t1.get(1), "zzzz");
-                }
+                Assert.assertEquals(t.get(2), 1L);
+                Assert.assertEquals(t.get(3), "zzzz");
             }
-            else if ("clock".equals(key.get(0)) && (Long) key.get(1) == 30L)
+            else if ("clock".equals(t.get(0)) && (Long) t.get(1) == 30L)
             {
-                DataBag columns = (DataBag) t.get(1);
-                Iterator<Tuple> iter = columns.iterator();
-                if (iter.hasNext())
-                {
-                    Tuple t1 = iter.next();
-                    Assert.assertEquals(t1.get(0), 2L);
-                    Assert.assertEquals(t1.get(1), "daddy?");
-                }
+                Assert.assertEquals(t.get(2), 2L);
+                Assert.assertEquals(t.get(3), "daddy?");
             }
         }
         Assert.assertEquals(4, count);
     }
 
-    private String getColumnValue(String ks, String cf, String colName, String key, String
validator) throws TException, IOException
+    private String getColumnValue(String ks, String cf, String colName, String key, String
validator) throws IOException
     {
-        Cassandra.Client client = getClient();
-        client.set_keyspace(ks);
+        Session client = getClient();
+        client.execute("USE " + ks);
+
+        String query = String.format("SELECT %s FROM %s WHERE key = '%s'", colName, cf, key);
+
+        ResultSet rows = client.execute(query);
+        Row row = rows.one();
 
-        ByteBuffer key_user_id = ByteBufferUtil.bytes(key);
-        ColumnPath cp = new ColumnPath(cf);
-        cp.column = ByteBufferUtil.bytes(colName);
+        if (row == null || row.isNull(0))
+            return null;
 
-        // read
-        ColumnOrSuperColumn got = client.get(key_user_id, cp, ConsistencyLevel.ONE);
-        return parseType(validator).getString(got.getColumn().value);
+        return parseType(validator).getString(row.getBytesUnsafe(0));
     }
 }


Mime
View raw message