Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8562B10CE5 for ; Wed, 27 Nov 2013 16:45:12 +0000 (UTC) Received: (qmail 15897 invoked by uid 500); 27 Nov 2013 16:44:52 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 15773 invoked by uid 500); 27 Nov 2013 16:44:42 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 15428 invoked by uid 99); 27 Nov 2013 16:44:32 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Nov 2013 16:44:32 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id D6627912830; Wed, 27 Nov 2013 16:44:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbellis@apache.org To: commits@cassandra.apache.org Date: Wed, 27 Nov 2013 16:44:36 -0000 Message-Id: <66ffa4ae13f5482bb20cf55884902e91@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [6/7] git commit: move pig-test out of normal unit tests (still part of test-all) move pig-test out of normal unit tests (still part of test-all) Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ef33f954 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ef33f954 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ef33f954 Branch: refs/heads/trunk Commit: ef33f9543ed3555dca4a095417a3e0d53df5fbcc Parents: 8e82590 Author: Jonathan Ellis Authored: Wed Nov 27 10:43:30 2013 -0600 Committer: Jonathan Ellis Committed: Wed Nov 27 10:43:45 2013 -0600 ---------------------------------------------------------------------- build.xml | 12 +- test/Test.iml | 214 +++++ .../cassandra/pig/CqlTableDataTypeTest.java | 461 +++++++++++ .../org/apache/cassandra/pig/CqlTableTest.java | 254 ++++++ .../org/apache/cassandra/pig/PigTestBase.java | 185 +++++ .../pig/ThriftColumnFamilyDataTypeTest.java | 220 +++++ .../cassandra/pig/ThriftColumnFamilyTest.java | 827 +++++++++++++++++++ test/pig/org/apache/pig/test/MiniCluster.java | 78 ++ .../org/apache/pig/test/MiniGenericCluster.java | 122 +++ .../cassandra/pig/CqlTableDataTypeTest.java | 461 ----------- .../org/apache/cassandra/pig/CqlTableTest.java | 254 ------ .../org/apache/cassandra/pig/PigTestBase.java | 185 ----- .../pig/ThriftColumnFamilyDataTypeTest.java | 220 ----- .../cassandra/pig/ThriftColumnFamilyTest.java | 827 ------------------- test/unit/org/apache/pig/test/MiniCluster.java | 78 -- .../org/apache/pig/test/MiniGenericCluster.java | 122 --- 16 files changed, 2368 insertions(+), 2152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef33f954/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index cc957b2..41a7fb0 100644 --- a/build.xml +++ b/build.xml @@ -57,6 +57,7 @@ + @@ -1127,15 +1128,16 @@ - - + + + http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef33f954/test/Test.iml ---------------------------------------------------------------------- diff --git a/test/Test.iml b/test/Test.iml new file mode 100644 index 0000000..fca23cc --- /dev/null +++ b/test/Test.iml @@ -0,0 +1,214 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef33f954/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java ---------------------------------------------------------------------- diff --git a/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java b/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java new file mode 100644 index 0000000..1ae9806 --- /dev/null +++ b/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java @@ -0,0 +1,461 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.pig; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.io.IOException; +import java.nio.charset.CharacterCodingException; +import java.util.Iterator; + +import org.apache.cassandra.db.marshal.TimeUUIDType; +import org.apache.cassandra.db.marshal.UUIDType; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.thrift.AuthenticationException; +import org.apache.cassandra.thrift.AuthorizationException; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.NotFoundException; +import org.apache.cassandra.thrift.SchemaDisagreementException; +import org.apache.cassandra.thrift.TimedOutException; +import org.apache.cassandra.thrift.UnavailableException; +import org.apache.cassandra.utils.Hex; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.Tuple; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class CqlTableDataTypeTest extends PigTestBase +{ + //ASCII (AsciiType.instance), + //BIGINT (LongType.instance), + //BLOB (BytesType.instance), + //BOOLEAN (BooleanType.instance), + //COUNTER (CounterColumnType.instance), + //DECIMAL (DecimalType.instance), + //DOUBLE (DoubleType.instance), + //FLOAT (FloatType.instance), + //INET (InetAddressType.instance), + //INT (Int32Type.instance), + //TEXT (UTF8Type.instance), + //TIMESTAMP(DateType.instance), + //UUID (UUIDType.instance), + //VARCHAR (UTF8Type.instance), + //VARINT (IntegerType.instance), + //TIMEUUID (TimeUUIDType.instance); + //SET + //LIST + //MAP + //Create table to test the above data types + private static String[] statements = { + "CREATE KEYSPACE cql3ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}", + "USE cql3ks;", + + "CREATE TABLE cqltable (" + + "key int primary key," + + "col_ascii ascii," + + "col_bigint bigint," + + "col_blob blob," + + "col_boolean boolean," + + "col_decimal decimal," + + "col_double double," + + "col_float float," + + "col_inet inet," + + "col_int int," + + "col_text text," + + "col_timestamp timestamp," + + "col_uuid uuid," + + "col_varchar varchar," + + "col_varint varint," + + "col_timeuuid timeuuid);", + + "CREATE TABLE settable (" + + "key int primary key," + + "col_set_ascii set," + + "col_set_bigint set," + + "col_set_blob set," + + "col_set_boolean set," + + "col_set_decimal set," + + "col_set_double set," + + "col_set_float set," + + "col_set_inet set," + + "col_set_int set," + + "col_set_text set," + + "col_set_timestamp set," + + "col_set_uuid set," + + "col_set_varchar set," + + "col_set_varint set," + + "col_set_timeuuid set);", + + "CREATE TABLE listtable (" + + "key int primary key," + + "col_list_ascii list," + + "col_list_bigint list," + + "col_list_blob list," + + "col_list_boolean list," + + "col_list_decimal list," + + "col_list_double list," + + "col_list_float list," + + "col_list_inet list," + + "col_list_int list," + + "col_list_text list," + + "col_list_timestamp list," + + "col_list_uuid list," + + "col_list_varchar list," + + "col_list_varint list," + + "col_list_timeuuid list);", + + "CREATE TABLE maptable (" + + "key int primary key," + + "col_map_ascii map," + + "col_map_bigint map," + + "col_map_blob map," + + "col_map_boolean map," + + "col_map_decimal map," + + "col_map_double map," + + "col_map_float map," + + "col_map_inet map," + + "col_map_int map," + + "col_map_text map," + + "col_map_timestamp map," + + "col_map_uuid map," + + "col_map_varchar map," + + "col_map_varint map," + + "col_map_timeuuid map);", + + "INSERT INTO cqltable(key, col_ascii) VALUES (1, 'ascii');", + "INSERT INTO cqltable(key, col_bigint) VALUES (1, 12345678);", + "INSERT INTO cqltable(key, col_blob) VALUES (1, 0x23446c6c6f);", + "INSERT INTO cqltable(key, col_boolean) VALUES (1, false);", + "INSERT INTO cqltable(key, col_decimal) VALUES (1, 23.4567);", + "INSERT INTO cqltable(key, col_double) VALUES (1, 12345678.12345678);", + "INSERT INTO cqltable(key, col_float) VALUES (1, 123.12);", + "INSERT INTO cqltable(key, col_inet) VALUES (1, '127.0.0.1');", + "INSERT INTO cqltable(key, col_int) VALUES (1, 123);", + "INSERT INTO cqltable(key, col_text) VALUES (1, 'text');", + "INSERT INTO cqltable(key, col_timestamp) VALUES (1, '2011-02-03T04:05:00+0000');", + "INSERT INTO cqltable(key, col_timeuuid) VALUES (1, maxTimeuuid('2013-01-01 00:05+0000'));", + "INSERT INTO cqltable(key, col_uuid) VALUES (1, 550e8400-e29b-41d4-a716-446655440000);", + "INSERT INTO cqltable(key, col_varchar) VALUES (1, 'varchar');", + "INSERT INTO cqltable(key, col_varint) VALUES (1, 123);", + + "INSERT INTO settable(key, col_set_ascii) VALUES (1, {'ascii1', 'ascii2'});", + "INSERT INTO settable(key, col_set_bigint) VALUES (1, {12345678, 12345679});", + "INSERT INTO settable(key, col_set_blob) VALUES (1, {0x68656c6c6f, 0x68656c6c6e});", + "INSERT INTO settable(key, col_set_boolean) VALUES (1, {false, true});", + "INSERT INTO settable(key, col_set_decimal) VALUES (1, {23.4567, 23.4568});", + "INSERT INTO settable(key, col_set_double) VALUES (1, {12345678.12345678, 12345678.12345679});", + "INSERT INTO settable(key, col_set_float) VALUES (1, {123.12, 123.13});", + "INSERT INTO settable(key, col_set_inet) VALUES (1, {'127.0.0.1', '127.0.0.2'});", + "INSERT INTO settable(key, col_set_int) VALUES (1, {123, 124});", + "INSERT INTO settable(key, col_set_text) VALUES (1, {'text1', 'text2'});", + "INSERT INTO settable(key, col_set_timestamp) VALUES (1, {'2011-02-03T04:05:00+0000', '2011-02-04T04:05:00+0000'});", + "INSERT INTO settable(key, col_set_timeuuid) VALUES (1, {e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f, e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77});", + "INSERT INTO settable(key, col_set_uuid) VALUES (1, {550e8400-e29b-41d4-a716-446655440000, 550e8400-e29b-41d4-a716-446655440001});", + "INSERT INTO settable(key, col_set_varchar) VALUES (1, {'varchar1', 'varchar2'});", + "INSERT INTO settable(key, col_set_varint) VALUES (1, {123, 124});", + + "INSERT INTO listtable(key, col_list_ascii) VALUES (1, ['ascii2', 'ascii1']);", + "INSERT INTO listtable(key, col_list_bigint) VALUES (1, [12345679, 12345678]);", + "INSERT INTO listtable(key, col_list_blob) VALUES (1, [0x68656c6c6e, 0x68656c6c6f]);", + "INSERT INTO listtable(key, col_list_boolean) VALUES (1, [true, false]);", + "INSERT INTO listtable(key, col_list_decimal) VALUES (1, [23.4568, 23.4567]);", + "INSERT INTO listtable(key, col_list_double) VALUES (1, [12345678.12345679, 12345678.12345678]);", + "INSERT INTO listtable(key, col_list_float) VALUES (1, [123.13, 123.12]);", + "INSERT INTO listtable(key, col_list_inet) VALUES (1, ['127.0.0.2', '127.0.0.1']);", + "INSERT INTO listtable(key, col_list_int) VALUES (1, [124, 123]);", + "INSERT INTO listtable(key, col_list_text) VALUES (1, ['text2', 'text1']);", + "INSERT INTO listtable(key, col_list_timestamp) VALUES (1, ['2011-02-04T04:05:00+0000', '2011-02-03T04:05:00+0000']);", + "INSERT INTO listtable(key, col_list_timeuuid) VALUES (1, [e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77, e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f]);", + "INSERT INTO listtable(key, col_list_uuid) VALUES (1, [550e8400-e29b-41d4-a716-446655440001, 550e8400-e29b-41d4-a716-446655440000]);", + "INSERT INTO listtable(key, col_list_varchar) VALUES (1, ['varchar2', 'varchar1']);", + "INSERT INTO listtable(key, col_list_varint) VALUES (1, [124, 123]);", + + "INSERT INTO maptable(key, col_map_ascii) VALUES (1, {'ascii1' : 'ascii2'});", + "INSERT INTO maptable(key, col_map_bigint) VALUES (1, {12345678 : 12345679});", + "INSERT INTO maptable(key, col_map_blob) VALUES (1, {0x68656c6c6f : 0x68656c6c6e});", + "INSERT INTO maptable(key, col_map_boolean) VALUES (1, {false : true});", + "INSERT INTO maptable(key, col_map_decimal) VALUES (1, {23.4567 : 23.4568});", + "INSERT INTO maptable(key, col_map_double) VALUES (1, {12345678.12345678 : 12345678.12345679});", + "INSERT INTO maptable(key, col_map_float) VALUES (1, {123.12 : 123.13});", + "INSERT INTO maptable(key, col_map_inet) VALUES (1, {'127.0.0.1' : '127.0.0.2'});", + "INSERT INTO maptable(key, col_map_int) VALUES (1, {123 : 124});", + "INSERT INTO maptable(key, col_map_text) VALUES (1, {'text1' : 'text2'});", + "INSERT INTO maptable(key, col_map_timestamp) VALUES (1, {'2011-02-03T04:05:00+0000' : '2011-02-04T04:05:00+0000'});", + "INSERT INTO maptable(key, col_map_timeuuid) VALUES (1, {e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f : e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77});", + "INSERT INTO maptable(key, col_map_uuid) VALUES (1, {550e8400-e29b-41d4-a716-446655440000 : 550e8400-e29b-41d4-a716-446655440001});", + "INSERT INTO maptable(key, col_map_varchar) VALUES (1, {'varchar1' : 'varchar2'});", + "INSERT INTO maptable(key, col_map_varint) VALUES (1, {123 : 124});", + + "CREATE TABLE countertable (key int primary key, col_counter counter);", + "UPDATE countertable SET col_counter = col_counter + 3 WHERE key = 1;", + }; + + @BeforeClass + public static void setup() throws TTransportException, IOException, InterruptedException, ConfigurationException, + AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, CharacterCodingException, ClassNotFoundException, NoSuchFieldException, IllegalAccessException, InstantiationException + { + startCassandra(); + setupDataByCql(statements); + startHadoopCluster(); + } + + @Test + public void testCqlStorageRegularType() + throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException + { + pig.registerQuery("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + "' USING CqlStorage();"); + Iterator it = pig.openIterator("rows"); + //{key: int, + //col_ascii: chararray, + //col_bigint: long, + //col_blob: bytearray, + //col_boolean: bytearray, + //col_decimal: chararray, + //col_double: double, + //col_float: float, + //col_inet: chararray, + //col_int: int, + //col_text: chararray, + //col_timestamp: long, + //col_timeuuid: bytearray, + //col_uuid: chararray, + //col_varchar: chararray, + //col_varint: int} + if (it.hasNext()) { + Tuple t = it.next(); + Assert.assertEquals(t.get(0), 1); + Assert.assertEquals(t.get(1), "ascii"); + Assert.assertEquals(t.get(2), 12345678L); + Assert.assertEquals(t.get(3), new DataByteArray(Hex.hexToBytes("23446c6c6f"))); + Assert.assertEquals(t.get(4), false); + Assert.assertEquals(t.get(5), "23.4567"); + Assert.assertEquals(t.get(6), 12345678.12345678d); + Assert.assertEquals(t.get(7), 123.12f); + Assert.assertEquals(t.get(8), "127.0.0.1"); + Assert.assertEquals(t.get(9), 123); + Assert.assertEquals(t.get(10), "text"); + Assert.assertEquals(t.get(11), 1296705900000L); + Assert.assertEquals(t.get(12), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array()))); + Assert.assertEquals(t.get(13), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array()))); + Assert.assertEquals(t.get(14), "varchar"); + Assert.assertEquals(t.get(15), 123); + } + + pig.registerQuery("cc_rows = LOAD 'cql://cql3ks/countertable?" + defaultParameters + "' USING CqlStorage();"); + it = pig.openIterator("cc_rows"); + if (it.hasNext()) { + Tuple t = it.next(); + Assert.assertEquals(t.get(0), 1); + Assert.assertEquals(t.get(1), 3L); + } + } + + @Test + public void testCqlStorageSetType() + throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException + { + pig.registerQuery("set_rows = LOAD 'cql://cql3ks/settable?" + defaultParameters + "' USING CqlStorage();"); + Iterator it = pig.openIterator("set_rows"); + if (it.hasNext()) { + Tuple t = it.next(); + Assert.assertEquals(t.get(0), 1); + Tuple innerTuple = (Tuple) t.get(1); + Assert.assertEquals(innerTuple.get(0), "ascii1"); + Assert.assertEquals(innerTuple.get(1), "ascii2"); + innerTuple = (Tuple) t.get(2); + Assert.assertEquals(innerTuple.get(0), 12345678L); + Assert.assertEquals(innerTuple.get(1), 12345679L); + innerTuple = (Tuple) t.get(3); + Assert.assertEquals(innerTuple.get(0), new DataByteArray(Hex.hexToBytes("68656c6c6e"))); + Assert.assertEquals(innerTuple.get(1), new DataByteArray(Hex.hexToBytes("68656c6c6f"))); + innerTuple = (Tuple) t.get(4); + Assert.assertEquals(innerTuple.get(0), false); + Assert.assertEquals(innerTuple.get(1), true); + innerTuple = (Tuple) t.get(5); + Assert.assertEquals(innerTuple.get(0), "23.4567"); + Assert.assertEquals(innerTuple.get(1), "23.4568"); + innerTuple = (Tuple) t.get(6); + Assert.assertEquals(innerTuple.get(0), 12345678.12345678d); + Assert.assertEquals(innerTuple.get(1), 12345678.12345679d); + innerTuple = (Tuple) t.get(7); + Assert.assertEquals(innerTuple.get(0), 123.12f); + Assert.assertEquals(innerTuple.get(1), 123.13f); + innerTuple = (Tuple) t.get(8); + Assert.assertEquals(innerTuple.get(0), "127.0.0.1"); + Assert.assertEquals(innerTuple.get(1), "127.0.0.2"); + innerTuple = (Tuple) t.get(9); + Assert.assertEquals(innerTuple.get(0), 123); + Assert.assertEquals(innerTuple.get(1), 124); + innerTuple = (Tuple) t.get(10); + Assert.assertEquals(innerTuple.get(0), "text1"); + Assert.assertEquals(innerTuple.get(1), "text2"); + innerTuple = (Tuple) t.get(11); + Assert.assertEquals(innerTuple.get(0), 1296705900000L); + Assert.assertEquals(innerTuple.get(1), 1296792300000L); + innerTuple = (Tuple) t.get(12); + Assert.assertEquals(innerTuple.get(0), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array()))); + Assert.assertEquals(innerTuple.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array()))); + innerTuple = (Tuple) t.get(13); + Assert.assertEquals(innerTuple.get(0), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array()))); + Assert.assertEquals(innerTuple.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440001").array()))); + innerTuple = (Tuple) t.get(14); + Assert.assertEquals(innerTuple.get(0), "varchar1"); + Assert.assertEquals(innerTuple.get(1), "varchar2"); + innerTuple = (Tuple) t.get(15); + Assert.assertEquals(innerTuple.get(0), 123); + Assert.assertEquals(innerTuple.get(1), 124); + } + } + + @Test + public void testCqlStorageListType() + throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException + { + pig.registerQuery("list_rows = LOAD 'cql://cql3ks/listtable?" + defaultParameters + "' USING CqlStorage();"); + Iterator it = pig.openIterator("list_rows"); + if (it.hasNext()) { + Tuple t = it.next(); + Assert.assertEquals(t.get(0), 1); + Tuple innerTuple = (Tuple) t.get(1); + Assert.assertEquals(innerTuple.get(1), "ascii1"); + Assert.assertEquals(innerTuple.get(0), "ascii2"); + innerTuple = (Tuple) t.get(2); + Assert.assertEquals(innerTuple.get(1), 12345678L); + Assert.assertEquals(innerTuple.get(0), 12345679L); + innerTuple = (Tuple) t.get(3); + Assert.assertEquals(innerTuple.get(1), new DataByteArray(Hex.hexToBytes("68656c6c6f"))); + Assert.assertEquals(innerTuple.get(0), new DataByteArray(Hex.hexToBytes("68656c6c6e"))); + innerTuple = (Tuple) t.get(4); + Assert.assertEquals(innerTuple.get(1), false); + Assert.assertEquals(innerTuple.get(0), true); + innerTuple = (Tuple) t.get(5); + Assert.assertEquals(innerTuple.get(1), "23.4567"); + Assert.assertEquals(innerTuple.get(0), "23.4568"); + innerTuple = (Tuple) t.get(6); + Assert.assertEquals(innerTuple.get(1), 12345678.12345678d); + Assert.assertEquals(innerTuple.get(0), 12345678.12345679d); + innerTuple = (Tuple) t.get(7); + Assert.assertEquals(innerTuple.get(1), 123.12f); + Assert.assertEquals(innerTuple.get(0), 123.13f); + innerTuple = (Tuple) t.get(8); + Assert.assertEquals(innerTuple.get(1), "127.0.0.1"); + Assert.assertEquals(innerTuple.get(0), "127.0.0.2"); + innerTuple = (Tuple) t.get(9); + Assert.assertEquals(innerTuple.get(1), 123); + Assert.assertEquals(innerTuple.get(0), 124); + innerTuple = (Tuple) t.get(10); + Assert.assertEquals(innerTuple.get(1), "text1"); + Assert.assertEquals(innerTuple.get(0), "text2"); + innerTuple = (Tuple) t.get(11); + Assert.assertEquals(innerTuple.get(1), 1296705900000L); + Assert.assertEquals(innerTuple.get(0), 1296792300000L); + innerTuple = (Tuple) t.get(12); + Assert.assertEquals(innerTuple.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array()))); + Assert.assertEquals(innerTuple.get(0), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array()))); + innerTuple = (Tuple) t.get(13); + Assert.assertEquals(innerTuple.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array()))); + Assert.assertEquals(innerTuple.get(0), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440001").array()))); + innerTuple = (Tuple) t.get(14); + Assert.assertEquals(innerTuple.get(1), "varchar1"); + Assert.assertEquals(innerTuple.get(0), "varchar2"); + innerTuple = (Tuple) t.get(15); + Assert.assertEquals(innerTuple.get(1), 123); + Assert.assertEquals(innerTuple.get(0), 124); + } + } + + @Test + public void testCqlStorageMapType() + throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException + { + pig.registerQuery("map_rows = LOAD 'cql://cql3ks/maptable?" + defaultParameters + "' USING CqlStorage();"); + Iterator it = pig.openIterator("map_rows"); + if (it.hasNext()) { + Tuple t = it.next(); + Assert.assertEquals(t.get(0), 1); + Tuple innerTuple = (Tuple) ((Tuple) t.get(1)).get(0); + Assert.assertEquals(innerTuple.get(0), "ascii1"); + Assert.assertEquals(innerTuple.get(1), "ascii2"); + innerTuple = (Tuple) ((Tuple) t.get(2)).get(0); + Assert.assertEquals(innerTuple.get(0), 12345678L); + Assert.assertEquals(innerTuple.get(1), 12345679L); + innerTuple = (Tuple) ((Tuple) t.get(3)).get(0); + Assert.assertEquals(innerTuple.get(0), new DataByteArray(Hex.hexToBytes("68656c6c6f"))); + Assert.assertEquals(innerTuple.get(1), new DataByteArray(Hex.hexToBytes("68656c6c6e"))); + innerTuple = (Tuple) ((Tuple) t.get(4)).get(0); + Assert.assertEquals(innerTuple.get(0), false); + Assert.assertEquals(innerTuple.get(1), true); + innerTuple = (Tuple) ((Tuple) t.get(5)).get(0); + Assert.assertEquals(innerTuple.get(0), "23.4567"); + Assert.assertEquals(innerTuple.get(1), "23.4568"); + innerTuple = (Tuple) ((Tuple) t.get(6)).get(0); + Assert.assertEquals(innerTuple.get(0), 12345678.12345678d); + Assert.assertEquals(innerTuple.get(1), 12345678.12345679d); + innerTuple = (Tuple) ((Tuple) t.get(7)).get(0); + Assert.assertEquals(innerTuple.get(0), 123.12f); + Assert.assertEquals(innerTuple.get(1), 123.13f); + innerTuple = (Tuple) ((Tuple) t.get(8)).get(0); + Assert.assertEquals(innerTuple.get(0), "127.0.0.1"); + Assert.assertEquals(innerTuple.get(1), "127.0.0.2"); + innerTuple = (Tuple) ((Tuple) t.get(9)).get(0); + Assert.assertEquals(innerTuple.get(0), 123); + Assert.assertEquals(innerTuple.get(1), 124); + innerTuple = (Tuple) ((Tuple) t.get(10)).get(0); + Assert.assertEquals(innerTuple.get(0), "text1"); + Assert.assertEquals(innerTuple.get(1), "text2"); + innerTuple = (Tuple) ((Tuple) t.get(11)).get(0); + Assert.assertEquals(innerTuple.get(0), 1296705900000L); + Assert.assertEquals(innerTuple.get(1), 1296792300000L); + innerTuple = (Tuple) ((Tuple) t.get(12)).get(0); + Assert.assertEquals(innerTuple.get(0), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array()))); + Assert.assertEquals(innerTuple.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array()))); + innerTuple = (Tuple) ((Tuple) t.get(13)).get(0); + Assert.assertEquals(innerTuple.get(0), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array()))); + Assert.assertEquals(innerTuple.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440001").array()))); + innerTuple = (Tuple) ((Tuple) t.get(14)).get(0); + Assert.assertEquals(innerTuple.get(0), "varchar1"); + Assert.assertEquals(innerTuple.get(1), "varchar2"); + innerTuple = (Tuple) ((Tuple) t.get(15)).get(0); + Assert.assertEquals(innerTuple.get(0), 123); + Assert.assertEquals(innerTuple.get(1), 124); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef33f954/test/pig/org/apache/cassandra/pig/CqlTableTest.java ---------------------------------------------------------------------- diff --git a/test/pig/org/apache/cassandra/pig/CqlTableTest.java b/test/pig/org/apache/cassandra/pig/CqlTableTest.java new file mode 100644 index 0000000..785d819 --- /dev/null +++ b/test/pig/org/apache/cassandra/pig/CqlTableTest.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.pig; + +import java.io.IOException; +import java.nio.charset.CharacterCodingException; +import java.util.Iterator; + +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.thrift.AuthenticationException; +import org.apache.cassandra.thrift.AuthorizationException; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.NotFoundException; +import org.apache.cassandra.thrift.SchemaDisagreementException; +import org.apache.cassandra.thrift.TimedOutException; +import org.apache.cassandra.thrift.UnavailableException; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class CqlTableTest extends PigTestBase +{ + private static String[] statements = { + "CREATE KEYSPACE cql3ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}", + "USE cql3ks;", + + "CREATE TABLE cqltable (key1 text, key2 int, column1 int, column2 float, primary key(key1, key2))", + "INSERT INTO cqltable (key1, key2, column1, column2) values ('key1', 111, 100, 10.1)", + "CREATE TABLE compactcqltable (key1 text, column1 int, column2 float, primary key(key1)) WITH COMPACT STORAGE", + "INSERT INTO compactcqltable (key1, column1, column2) values ('key1', 100, 10.1)", + + "CREATE TABLE test (a int PRIMARY KEY, b int);", + + "CREATE TABLE moredata (x int PRIMARY KEY, y int);", + "INSERT INTO test (a,b) VALUES (1,1);", + "INSERT INTO test (a,b) VALUES (2,2);", + "INSERT INTO test (a,b) VALUES (3,3);", + "INSERT INTO moredata (x, y) VALUES (4,4);", + "INSERT INTO moredata (x, y) VALUES (5,5);", + "INSERT INTO moredata (x, y) VALUES (6,6);", + + "CREATE TABLE compotable (a int, b int, c text, d text, PRIMARY KEY (a,b,c));", + "INSERT INTO compotable (a, b , c , d ) VALUES ( 1,1,'One','match');", + "INSERT INTO compotable (a, b , c , d ) VALUES ( 2,2,'Two','match');", + "INSERT INTO compotable (a, b , c , d ) VALUES ( 3,3,'Three','match');", + "INSERT INTO compotable (a, b , c , d ) VALUES ( 4,4,'Four','match');", + + "create table compmore (id int PRIMARY KEY, x int, y int, z text, data text);", + "INSERT INTO compmore (id, x, y, z,data) VALUES (1,5,6,'Fix','nomatch');", + "INSERT INTO compmore (id, x, y, z,data) VALUES (2,6,5,'Sive','nomatch');", + "INSERT INTO compmore (id, x, y, z,data) VALUES (3,7,7,'Seven','match');", + "INSERT INTO compmore (id, x, y, z,data) VALUES (4,8,8,'Eight','match');", + "INSERT INTO compmore (id, x, y, z,data) VALUES (5,9,10,'Ninen','nomatch');", + + "CREATE TABLE collectiontable(m text PRIMARY KEY, n map);", + "UPDATE collectiontable SET n['key1'] = 'value1' WHERE m = 'book1';", + "UPDATE collectiontable SET n['key2'] = 'value2' WHERE m = 'book2';", + "UPDATE collectiontable SET n['key3'] = 'value3' WHERE m = 'book3';", + "UPDATE collectiontable SET n['key4'] = 'value4' WHERE m = 'book4';", + }; + + @BeforeClass + public static void setup() throws TTransportException, IOException, InterruptedException, ConfigurationException, + AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, CharacterCodingException, ClassNotFoundException, NoSuchFieldException, IllegalAccessException, InstantiationException + { + startCassandra(); + setupDataByCql(statements); + startHadoopCluster(); + } + + @Test + public void testCqlStorageSchema() + throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException + { + pig.registerQuery("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters + "' USING CqlStorage();"); + Iterator it = pig.openIterator("rows"); + if (it.hasNext()) { + Tuple t = it.next(); + Assert.assertEquals(t.get(0).toString(), "key1"); + Assert.assertEquals(t.get(1), 111); + Assert.assertEquals(t.get(2), 100); + Assert.assertEquals(t.get(3), 10.1f); + Assert.assertEquals(4, t.size()); + } + + pig.registerQuery("rows = LOAD 'cql://cql3ks/compactcqltable?" + defaultParameters + "' USING CqlStorage();"); + it = pig.openIterator("rows"); + if (it.hasNext()) { + Tuple t = it.next(); + Assert.assertEquals(t.get(0).toString(), "key1"); + Assert.assertEquals(t.get(1), 100); + Assert.assertEquals(t.get(2), 10.1f); + Assert.assertEquals(3, t.size()); + } + } + + @Test + public void testCqlStorageSingleKeyTable() + throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException + { + pig.setBatchOn(); + pig.registerQuery("moretestvalues= LOAD 'cql://cql3ks/moredata?" + defaultParameters + "' USING CqlStorage();"); + pig.registerQuery("insertformat= FOREACH moretestvalues GENERATE TOTUPLE(TOTUPLE('a',x)),TOTUPLE(y);"); + pig.registerQuery("STORE insertformat INTO 'cql://cql3ks/test?" + defaultParameters + "&output_query=UPDATE+cql3ks.test+set+b+%3D+%3F' USING CqlStorage();"); + pig.executeBatch(); + //(5,5) + //(6,6) + //(4,4) + //(2,2) + //(3,3) + //(1,1) + pig.registerQuery("result= LOAD 'cql://cql3ks/test?" + defaultParameters + "' USING CqlStorage();"); + Iterator it = pig.openIterator("result"); + if (it.hasNext()) { + Tuple t = it.next(); + Assert.assertEquals(t.get(0), t.get(1)); + } + } + + @Test + public void testCqlStorageCompositeKeyTable() + throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException + { + pig.setBatchOn(); + pig.registerQuery("moredata= LOAD 'cql://cql3ks/compmore?" + defaultParameters + "' USING CqlStorage();"); + pig.registerQuery("insertformat = FOREACH moredata GENERATE TOTUPLE (TOTUPLE('a',x),TOTUPLE('b',y), TOTUPLE('c',z)),TOTUPLE(data);"); + pig.registerQuery("STORE insertformat INTO 'cql://cql3ks/compotable?" + defaultParameters + "&output_query=UPDATE%20cql3ks.compotable%20SET%20d%20%3D%20%3F' USING CqlStorage();"); + pig.executeBatch(); + + //(5,6,Fix,nomatch) + //(3,3,Three,match) + //(1,1,One,match) + //(2,2,Two,match) + //(7,7,Seven,match) + //(8,8,Eight,match) + //(6,5,Sive,nomatch) + //(4,4,Four,match) + //(9,10,Ninen,nomatch) + pig.registerQuery("result= LOAD 'cql://cql3ks/compotable?" + defaultParameters + "' USING CqlStorage();"); + Iterator it = pig.openIterator("result"); + if (it.hasNext()) { + Tuple t = it.next(); + Assert.assertEquals(t.get(3), "match"); + } + } + + @Test + public void testCqlStorageCollectionColumnTable() + throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException + { + pig.setBatchOn(); + pig.registerQuery("collectiontable= LOAD 'cql://cql3ks/collectiontable?" + defaultParameters + "' USING CqlStorage();"); + pig.registerQuery("recs= FOREACH collectiontable GENERATE TOTUPLE(TOTUPLE('m', m) ), TOTUPLE(TOTUPLE('map', TOTUPLE('m', 'mm'), TOTUPLE('n', 'nn')));"); + pig.registerQuery("STORE recs INTO 'cql://cql3ks/collectiontable?" + defaultParameters + "&output_query=update+cql3ks.collectiontable+set+n+%3D+%3F' USING CqlStorage();"); + pig.executeBatch(); + + //(book2,((m,mm),(n,nn))) + //(book3,((m,mm),(n,nn))) + //(book4,((m,mm),(n,nn))) + //(book1,((m,mm),(n,nn))) + pig.registerQuery("result= LOAD 'cql://cql3ks/collectiontable?" + defaultParameters + "' USING CqlStorage();"); + Iterator it = pig.openIterator("result"); + if (it.hasNext()) { + Tuple t = it.next(); + Tuple t1 = (Tuple) t.get(1); + Assert.assertEquals(t1.size(), 2); + Tuple element1 = (Tuple) t1.get(0); + Tuple element2 = (Tuple) t1.get(1); + Assert.assertEquals(element1.get(0), "m"); + Assert.assertEquals(element1.get(1), "mm"); + Assert.assertEquals(element2.get(0), "n"); + Assert.assertEquals(element2.get(1), "nn"); + } + } + + @Test + public void testCassandraStorageSchema() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException + { + //results: (key1,{((111,),),((111,column1),100),((111,column2),10.1)}) + pig.registerQuery("rows = LOAD 'cassandra://cql3ks/cqltable?" + defaultParameters + "' USING CassandraStorage();"); + + //schema: {key: chararray,columns: {(name: (),value: bytearray)}} + Iterator it = pig.openIterator("rows"); + if (it.hasNext()) { + Tuple t = it.next(); + String rowKey = t.get(0).toString(); + Assert.assertEquals(rowKey, "key1"); + DataBag columns = (DataBag) t.get(1); + Iterator iter = columns.iterator(); + int i = 0; + while(iter.hasNext()) + { + i++; + Tuple column = (Tuple) iter.next(); + if (i==1) + { + Assert.assertEquals(((Tuple) column.get(0)).get(0), 111); + Assert.assertEquals(((Tuple) column.get(0)).get(1), ""); + Assert.assertEquals(column.get(1).toString(), ""); + } + if (i==2) + { + Assert.assertEquals(((Tuple) column.get(0)).get(0), 111); + Assert.assertEquals(((Tuple) column.get(0)).get(1), "column1"); + Assert.assertEquals(column.get(1), 100); + } + if (i==3) + { + Assert.assertEquals(((Tuple) column.get(0)).get(0), 111); + Assert.assertEquals(((Tuple) column.get(0)).get(1), "column2"); + Assert.assertEquals(column.get(1), 10.1f); + } + } + Assert.assertEquals(3, columns.size()); + } + + //results: (key1,(column1,100),(column2,10.1)) + pig.registerQuery("compact_rows = LOAD 'cassandra://cql3ks/compactcqltable?" + defaultParameters + "' USING CassandraStorage();"); + + //schema: {key: chararray,column1: (name: chararray,value: int),column2: (name: chararray,value: float)} + it = pig.openIterator("compact_rows"); + if (it.hasNext()) { + Tuple t = it.next(); + String rowKey = t.get(0).toString(); + Assert.assertEquals(rowKey, "key1"); + Tuple column = (Tuple) t.get(1); + Assert.assertEquals(column.get(0), "column1"); + Assert.assertEquals(column.get(1), 100); + column = (Tuple) t.get(2); + Assert.assertEquals(column.get(0), "column2"); + Assert.assertEquals(column.get(1), 10.1f); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef33f954/test/pig/org/apache/cassandra/pig/PigTestBase.java ---------------------------------------------------------------------- diff --git a/test/pig/org/apache/cassandra/pig/PigTestBase.java b/test/pig/org/apache/cassandra/pig/PigTestBase.java new file mode 100644 index 0000000..ea06b8c --- /dev/null +++ b/test/pig/org/apache/cassandra/pig/PigTestBase.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.pig; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.charset.CharacterCodingException; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.cli.CliMain; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.TypeParser; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.exceptions.SyntaxException; +import org.apache.cassandra.service.EmbeddedCassandraService; +import org.apache.cassandra.thrift.Cassandra; +import org.apache.cassandra.thrift.Compression; +import org.apache.cassandra.thrift.ConsistencyLevel; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.NotFoundException; +import org.apache.cassandra.thrift.SchemaDisagreementException; +import org.apache.cassandra.thrift.TimedOutException; +import org.apache.cassandra.thrift.UnavailableException; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.impl.PigContext; +import org.apache.pig.test.MiniCluster; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; + +public class PigTestBase extends SchemaLoader +{ + protected static EmbeddedCassandraService cassandra; + protected static Configuration conf; + protected static MiniCluster cluster; + protected static PigServer pig; + protected static String defaultParameters= "init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.ByteOrderedPartitioner"; + + @AfterClass + public static void oneTimeTearDown() throws Exception { + cluster.shutDown(); + } + + @Before + public void beforeTest() throws Exception { + pig = new PigServer(new PigContext(ExecType.LOCAL, ConfigurationUtil.toProperties(conf))); + PigContext.initializeImportList("org.apache.cassandra.hadoop.pig"); + } + + @After + public void tearDown() throws Exception { + pig.shutdown(); + } + + protected static Cassandra.Client getClient() throws TTransportException + { + TTransport tr = new TFramedTransport(new TSocket("localhost", DatabaseDescriptor.getRpcPort())); + TProtocol proto = new TBinaryProtocol(tr); + Cassandra.Client client = new Cassandra.Client(proto); + tr.open(); + return client; + } + + protected static void startCassandra() throws IOException + { + Schema.instance.clear(); // Schema are now written on disk and will be reloaded + cassandra = new EmbeddedCassandraService(); + cassandra.start(); + } + + protected static void startHadoopCluster() + { + cluster = MiniCluster.buildCluster(); + conf = cluster.getConfiguration(); + } + + protected AbstractType parseType(String type) throws IOException + { + try + { + return TypeParser.parse(type); + } + catch (ConfigurationException e) + { + throw new IOException(e); + } + catch (SyntaxException e) + { + throw new IOException(e); + } + } + + protected static void setupDataByCli(String[] statements) throws CharacterCodingException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException + { + // new error/output streams for CliSessionState + ByteArrayOutputStream errStream = new ByteArrayOutputStream(); + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + + // checking if we can connect to the running cassandra node on localhost + CliMain.connect("127.0.0.1", 9170); + + // setting new output stream + CliMain.sessionState.setOut(new PrintStream(outStream)); + CliMain.sessionState.setErr(new PrintStream(errStream)); + + // re-creating keyspace for tests + try + { + // dropping in case it exists e.g. could be left from previous run + CliMain.processStatement("drop keyspace thriftKs;"); + } + catch (Exception e) + { + } + + for (String statement : statements) + { + errStream.reset(); + System.out.println("Executing statement: " + statement); + CliMain.processStatement(statement); + String result = outStream.toString(); + System.out.println("result: " + result); + outStream.reset(); // reset stream so we have only output from next statement all the time + errStream.reset(); // no errors to the end user. + } + } + + protected static void setupDataByCql(String[] statements) throws InvalidRequestException, UnavailableException, TimedOutException, TException + { + Cassandra.Client client = getClient(); + // re-creating keyspace for tests + try + { + // dropping in case it exists e.g. could be left from previous run + client.execute_cql3_query(ByteBufferUtil.bytes("DROP KEYSPACE cql3ks"), Compression.NONE, ConsistencyLevel.ONE); + } + catch (Exception e) + { + } + + for (String statement : statements) + { + try + { + System.out.println("Executing statement: " + statement); + client.execute_cql3_query(ByteBufferUtil.bytes(statement), Compression.NONE, ConsistencyLevel.ONE); + } + catch (SchemaDisagreementException e) + { + Assert.fail(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ef33f954/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java ---------------------------------------------------------------------- diff --git a/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java new file mode 100644 index 0000000..7bccc23 --- /dev/null +++ b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyDataTypeTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.pig; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import java.io.IOException; +import java.nio.charset.CharacterCodingException; +import java.util.Iterator; + +import org.apache.cassandra.db.marshal.TimeUUIDType; +import org.apache.cassandra.db.marshal.UUIDType; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.thrift.AuthenticationException; +import org.apache.cassandra.thrift.AuthorizationException; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.thrift.NotFoundException; +import org.apache.cassandra.thrift.TimedOutException; +import org.apache.cassandra.thrift.UnavailableException; +import org.apache.cassandra.utils.Hex; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.Tuple; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class ThriftColumnFamilyDataTypeTest extends PigTestBase +{ + //AsciiType + //LongType + //BytesType + //BooleanType + //CounterColumnType + //DecimalType + //DoubleType + //FloatType + //InetAddressType + //Int32Type + //UTF8Type + //DateType + //UUIDType + //IntegerType + //TimeUUIDType + //IntegerType + //LexicalUUIDType + private static String[] statements = { + "create keyspace thriftKs with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and" + + " strategy_options={replication_factor:1};", + "use thriftKs;", + + "create column family SomeApp " + + " with comparator = UTF8Type " + + " and default_validation_class = UTF8Type " + + " and key_validation_class = UTF8Type " + + " and column_metadata = [" + + "{column_name: col_ascii, validation_class: AsciiType}, " + + "{column_name: col_long, validation_class: LongType}, " + + "{column_name: col_bytes, validation_class: BytesType}, " + + "{column_name: col_boolean, validation_class: BooleanType}, " + + "{column_name: col_decimal, validation_class: DecimalType}, " + + "{column_name: col_double, validation_class: DoubleType}, " + + "{column_name: col_float, validation_class: FloatType}," + + "{column_name: col_inetaddress, validation_class: InetAddressType}, " + + "{column_name: col_int32, validation_class: Int32Type}, " + + "{column_name: col_uft8, validation_class: UTF8Type}, " + + "{column_name: col_date, validation_class: DateType}, " + + "{column_name: col_uuid, validation_class: UUIDType}, " + + "{column_name: col_integer, validation_class: IntegerType}, " + + "{column_name: col_timeuuid, validation_class: TimeUUIDType}, " + + "{column_name: col_lexical_uuid, validation_class: LexicalUUIDType}, " + + "]; ", + + "set SomeApp['foo']['col_ascii'] = 'ascii';", + "set SomeApp['foo']['col_boolean'] = false;", + "set SomeApp['foo']['col_bytes'] = 'DEADBEEF';", + "set SomeApp['foo']['col_date'] = '2011-02-03T04:05:00+0000';", + "set SomeApp['foo']['col_decimal'] = '23.345';", + "set SomeApp['foo']['col_double'] = '2.7182818284590451';", + "set SomeApp['foo']['col_float'] = '23.45';", + "set SomeApp['foo']['col_inetaddress'] = '127.0.0.1';", + "set SomeApp['foo']['col_int32'] = 23;", + "set SomeApp['foo']['col_integer'] = 12345;", + "set SomeApp['foo']['col_long'] = 12345678;", + "set SomeApp['foo']['col_lexical_uuid'] = 'e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77';", + "set SomeApp['foo']['col_timeuuid'] = 'e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f';", + "set SomeApp['foo']['col_uft8'] = 'hello';", + "set SomeApp['foo']['col_uuid'] = '550e8400-e29b-41d4-a716-446655440000';", + + "create column family CC with " + + "key_validation_class = UTF8Type and " + + "default_validation_class=CounterColumnType " + + "and comparator=UTF8Type;", + + "incr CC['chuck']['kick'];", + "incr CC['chuck']['kick'];", + "incr CC['chuck']['kick'];" + }; + + @BeforeClass + public static void setup() throws TTransportException, IOException, InterruptedException, ConfigurationException, + AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, CharacterCodingException, ClassNotFoundException, NoSuchFieldException, IllegalAccessException, InstantiationException + { + startCassandra(); + setupDataByCli(statements); + startHadoopCluster(); + } + + @Test + public void testCassandraStorageDataType() throws IOException, ClassNotFoundException, TException, TimedOutException, NotFoundException, InvalidRequestException, NoSuchFieldException, UnavailableException, IllegalAccessException, InstantiationException + { + pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + defaultParameters + "' USING CassandraStorage();"); + + //{key: chararray, col_ascii: (name: chararray,value: chararray), + //col_boolean: (name: chararray,value: bytearray), + //col_bytes: (name: chararray,value: bytearray), + //col_date: (name: chararray,value: long), + //col_decimal: (name: chararray,value: chararray), + //col_double: (name: chararray,value: double), + //col_float: (name: chararray,value: float), + //col_inetaddress: (name: chararray,value: chararray), + //col_int32: (name: chararray,value: int), + //col_integer: (name: chararray,value: int), + //col_lexical_uuid: (name: chararray,value: chararray), + //col_long: (name: chararray,value: long), + //col_timeuuid: (name: chararray,value: bytearray), + //col_uft8: (name: chararray,value: chararray), + //col_uuid: (name: chararray,value: chararray), + //columns: {(name: chararray,value: chararray)}} + Iterator it = pig.openIterator("rows"); + if (it.hasNext()) { + Tuple t = it.next(); + Assert.assertEquals(t.get(0), "foo"); + Tuple column = (Tuple) t.get(1); + Assert.assertEquals(column.get(1), "ascii"); + column = (Tuple) t.get(2); + Assert.assertEquals(column.get(1), false); + column = (Tuple) t.get(3); + Assert.assertEquals(column.get(1), new DataByteArray(Hex.hexToBytes("DEADBEEF"))); + column = (Tuple) t.get(4); + Assert.assertEquals(column.get(1), 1296705900000L); + column = (Tuple) t.get(5); + Assert.assertEquals(column.get(1), "23.345"); + column = (Tuple) t.get(6); + Assert.assertEquals(column.get(1), 2.7182818284590451d); + column = (Tuple) t.get(7); + Assert.assertEquals(column.get(1), 23.45f); + column = (Tuple) t.get(8); + Assert.assertEquals(column.get(1), "127.0.0.1"); + column = (Tuple) t.get(9); + Assert.assertEquals(column.get(1), 23); + column = (Tuple) t.get(10); + Assert.assertEquals(column.get(1), 12345); + column = (Tuple) t.get(11); + Assert.assertEquals(column.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f77").array()))); + column = (Tuple) t.get(12); + Assert.assertEquals(column.get(1), 12345678L); + column = (Tuple) t.get(13); + Assert.assertEquals(column.get(1), new DataByteArray((TimeUUIDType.instance.fromString("e23f450f-53a6-11e2-7f7f-7f7f7f7f7f7f").array()))); + column = (Tuple) t.get(14); + Assert.assertEquals(column.get(1), "hello"); + column = (Tuple) t.get(15); + Assert.assertEquals(column.get(1), new DataByteArray((UUIDType.instance.fromString("550e8400-e29b-41d4-a716-446655440000").array()))); + } + + pig.registerQuery("cc_rows = LOAD 'cassandra://thriftKs/CC?" + defaultParameters + "' USING CassandraStorage();"); + + //(chuck,{(kick,3)}) + it = pig.openIterator("cc_rows"); + if (it.hasNext()) { + Tuple t = it.next(); + Assert.assertEquals(t.get(0), "chuck"); + DataBag columns = (DataBag) t.get(1); + Iterator iter = columns.iterator(); + if(iter.hasNext()) + { + Tuple column = iter.next(); + Assert.assertEquals(column.get(0), "kick"); + Assert.assertEquals(column.get(1), 3L); + } + } + } +}