cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [1/4] cassandra git commit: Remove Thrift dependencies in bundled tools
Date Tue, 05 May 2015 20:56:58 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 5b6154531 -> f698cc228


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 22921e2..51e5e3d 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -18,33 +18,27 @@
 package org.apache.cassandra.tools;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.MalformedURLException;
 import java.net.UnknownHostException;
 import java.util.*;
 
-import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import org.apache.commons.cli.*;
 
-import org.apache.cassandra.auth.PasswordAuthenticator;
+import com.datastax.driver.core.SSLOptions;
+import javax.net.ssl.SSLContext;
 import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.SSTableLoader;
-import org.apache.cassandra.schema.LegacySchemaTables;
+import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.streaming.*;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.NativeSSTableLoaderClient;
 import org.apache.cassandra.utils.OutputHandler;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
 
 public class BulkLoader
 {
@@ -54,7 +48,7 @@ public class BulkLoader
     private static final String NOPROGRESS_OPTION  = "no-progress";
     private static final String IGNORE_NODES_OPTION  = "ignore";
     private static final String INITIAL_HOST_ADDRESS_OPTION = "nodes";
-    private static final String RPC_PORT_OPTION = "port";
+    private static final String NATIVE_PORT_OPTION = "port";
     private static final String USER_OPTION = "username";
     private static final String PASSWD_OPTION = "password";
     private static final String THROTTLE_MBITS = "throttle";
@@ -82,13 +76,13 @@ public class BulkLoader
                 options.directory,
                 new ExternalClient(
                         options.hosts,
-                        options.rpcPort,
+                        options.nativePort,
                         options.user,
                         options.passwd,
-                        options.transportFactory,
                         options.storagePort,
                         options.sslStoragePort,
-                        options.serverEncOptions),
+                        options.serverEncOptions,
+                        buildSSLOptions((EncryptionOptions.ClientEncryptionOptions)options.encOptions)),
                 handler,
                 options.connectionsPerHost);
         DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
@@ -154,8 +148,13 @@ public class BulkLoader
             start = lastTime = System.nanoTime();
         }
 
-        public void onSuccess(StreamState finalState) {}
-        public void onFailure(Throwable t) {}
+        public void onSuccess(StreamState finalState)
+        {
+        }
+
+        public void onFailure(Throwable t)
+        {
+        }
 
         public synchronized void handleStreamEvent(StreamEvent event)
         {
@@ -254,14 +253,27 @@ public class BulkLoader
         }
     }
 
-    static class ExternalClient extends SSTableLoader.Client
+    private static SSLOptions buildSSLOptions(EncryptionOptions.ClientEncryptionOptions clientEncryptionOptions)
+    {
+
+        if (!clientEncryptionOptions.enabled)
+            return null;
+
+        SSLContext sslContext;
+        try
+        {
+            sslContext = SSLFactory.createSSLContext(clientEncryptionOptions, true);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("Could not create SSL Context.", e);
+        }
+
+        return new SSLOptions(sslContext, clientEncryptionOptions.cipher_suites);
+    }
+
+    static class ExternalClient extends NativeSSTableLoaderClient
     {
-        private final Map<String, CFMetaData> knownCfs = new HashMap<>();
-        private final Set<InetAddress> hosts;
-        private final int rpcPort;
-        private final String user;
-        private final String passwd;
-        private final ITransportFactory transportFactory;
         private final int storagePort;
         private final int sslStoragePort;
         private final EncryptionOptions.ServerEncryptionOptions serverEncOptions;
@@ -270,103 +282,22 @@ public class BulkLoader
                               int port,
                               String user,
                               String passwd,
-                              ITransportFactory transportFactory,
                               int storagePort,
                               int sslStoragePort,
-                              EncryptionOptions.ServerEncryptionOptions serverEncryptionOptions)
+                              EncryptionOptions.ServerEncryptionOptions serverEncryptionOptions,
+                              SSLOptions sslOptions)
         {
-            super();
-            this.hosts = hosts;
-            this.rpcPort = port;
-            this.user = user;
-            this.passwd = passwd;
-            this.transportFactory = transportFactory;
+            super(hosts, port, user, passwd, sslOptions);
             this.storagePort = storagePort;
             this.sslStoragePort = sslStoragePort;
             this.serverEncOptions = serverEncryptionOptions;
         }
 
         @Override
-        public void init(String keyspace)
-        {
-            Iterator<InetAddress> hostiter = hosts.iterator();
-            while (hostiter.hasNext())
-            {
-                try
-                {
-                    // Query endpoint to ranges map and schemas from thrift
-                    InetAddress host = hostiter.next();
-                    Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort,
this.user, this.passwd, this.transportFactory);
-
-                    setPartitioner(client.describe_partitioner());
-                    Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
-
-                    for (TokenRange tr : client.describe_ring(keyspace))
-                    {
-                        Range<Token> range = new Range<>(tkFactory.fromString(tr.start_token),
tkFactory.fromString(tr.end_token));
-                        for (String ep : tr.endpoints)
-                        {
-                            addRangeForEndpoint(range, InetAddress.getByName(ep));
-                        }
-                    }
-
-                    String cfQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name
= '%s'",
-                                                   SystemKeyspace.NAME,
-                                                   LegacySchemaTables.COLUMNFAMILIES,
-                                                   keyspace);
-                    CqlResult cfRes = client.execute_cql3_query(ByteBufferUtil.bytes(cfQuery),
Compression.NONE, ConsistencyLevel.ONE);
-
-
-                    for (CqlRow row : cfRes.rows)
-                    {
-                        String columnFamily = UTF8Type.instance.getString(row.columns.get(1).bufferForName());
-                        String columnsQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name
= '%s' AND columnfamily_name = '%s'",
-                                                            SystemKeyspace.NAME,
-                                                            LegacySchemaTables.COLUMNS,
-                                                            keyspace,
-                                                            columnFamily);
-                        CqlResult columnsRes = client.execute_cql3_query(ByteBufferUtil.bytes(columnsQuery),
Compression.NONE, ConsistencyLevel.ONE);
-
-                        CFMetaData metadata = ThriftConversion.fromThriftCqlRow(row, columnsRes);
-                        knownCfs.put(metadata.cfName, metadata);
-                    }
-                    break;
-                }
-                catch (Exception e)
-                {
-                    if (!hostiter.hasNext())
-                        throw new RuntimeException("Could not retrieve endpoint ranges: ",
e);
-                }
-            }
-        }
-
-        @Override
         public StreamConnectionFactory getConnectionFactory()
         {
             return new BulkLoadConnectionFactory(storagePort, sslStoragePort, serverEncOptions,
false);
         }
-
-        @Override
-        public CFMetaData getCFMetaData(String keyspace, String cfName)
-        {
-            return knownCfs.get(cfName);
-        }
-
-        private static Cassandra.Client createThriftClient(String host, int port, String
user, String passwd, ITransportFactory transportFactory) throws Exception
-        {
-            TTransport trans = transportFactory.openTransport(host, port);
-            TProtocol protocol = new TBinaryProtocol(trans);
-            Cassandra.Client client = new Cassandra.Client(protocol);
-            if (user != null && passwd != null)
-            {
-                Map<String, String> credentials = new HashMap<>();
-                credentials.put(PasswordAuthenticator.USERNAME_KEY, user);
-                credentials.put(PasswordAuthenticator.PASSWORD_KEY, passwd);
-                AuthenticationRequest authenticationRequest = new AuthenticationRequest(credentials);
-                client.login(authenticationRequest);
-            }
-            return client;
-        }
     }
 
     static class LoaderOptions
@@ -376,13 +307,12 @@ public class BulkLoader
         public boolean debug;
         public boolean verbose;
         public boolean noProgress;
-        public int rpcPort = 9160;
+        public int nativePort = 9042;
         public String user;
         public String passwd;
         public int throttle = 0;
         public int storagePort;
         public int sslStoragePort;
-        public ITransportFactory transportFactory = new TFramedTransportFactory();
         public EncryptionOptions encOptions = new EncryptionOptions.ClientEncryptionOptions();
         public int connectionsPerHost = 1;
         public EncryptionOptions.ServerEncryptionOptions serverEncOptions = new EncryptionOptions.ServerEncryptionOptions();
@@ -438,8 +368,8 @@ public class BulkLoader
                 opts.verbose = cmd.hasOption(VERBOSE_OPTION);
                 opts.noProgress = cmd.hasOption(NOPROGRESS_OPTION);
 
-                if (cmd.hasOption(RPC_PORT_OPTION))
-                    opts.rpcPort = Integer.parseInt(cmd.getOptionValue(RPC_PORT_OPTION));
+                if (cmd.hasOption(NATIVE_PORT_OPTION))
+                    opts.nativePort = Integer.parseInt(cmd.getOptionValue(NATIVE_PORT_OPTION));
 
                 if (cmd.hasOption(USER_OPTION))
                     opts.user = cmd.getOptionValue(USER_OPTION);
@@ -558,13 +488,6 @@ public class BulkLoader
                     opts.encOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(",");
                 }
 
-                if (cmd.hasOption(TRANSPORT_FACTORY))
-                {
-                    ITransportFactory transportFactory = getTransportFactory(cmd.getOptionValue(TRANSPORT_FACTORY));
-                    configureTransportFactory(transportFactory, opts);
-                    opts.transportFactory = transportFactory;
-                }
-
                 return opts;
             }
             catch (ParseException | ConfigurationException | MalformedURLException e)
@@ -574,50 +497,6 @@ public class BulkLoader
             }
         }
 
-        private static ITransportFactory getTransportFactory(String transportFactory)
-        {
-            try
-            {
-                Class<?> factory = Class.forName(transportFactory);
-                if (!ITransportFactory.class.isAssignableFrom(factory))
-                    throw new IllegalArgumentException(String.format("transport factory '%s'
" +
-                            "not derived from ITransportFactory", transportFactory));
-                return (ITransportFactory) factory.newInstance();
-            }
-            catch (Exception e)
-            {
-                throw new IllegalArgumentException(String.format("Cannot create a transport
factory '%s'.", transportFactory), e);
-            }
-        }
-
-        private static void configureTransportFactory(ITransportFactory transportFactory,
LoaderOptions opts)
-        {
-            Map<String, String> options = new HashMap<>();
-            // If the supplied factory supports the same set of options as our SSL impl,
set those 
-            if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE))
-                options.put(SSLTransportFactory.TRUSTSTORE, opts.encOptions.truststore);
-            if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE_PASSWORD))
-                options.put(SSLTransportFactory.TRUSTSTORE_PASSWORD, opts.encOptions.truststore_password);
-            if (transportFactory.supportedOptions().contains(SSLTransportFactory.PROTOCOL))
-                options.put(SSLTransportFactory.PROTOCOL, opts.encOptions.protocol);
-            if (transportFactory.supportedOptions().contains(SSLTransportFactory.CIPHER_SUITES))
-                options.put(SSLTransportFactory.CIPHER_SUITES, Joiner.on(',').join(opts.encOptions.cipher_suites));
-
-            if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE)
-                    && opts.encOptions.require_client_auth)
-                options.put(SSLTransportFactory.KEYSTORE, opts.encOptions.keystore);
-            if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE_PASSWORD)
-                    && opts.encOptions.require_client_auth)
-                options.put(SSLTransportFactory.KEYSTORE_PASSWORD, opts.encOptions.keystore_password);
-
-            // Now check if any of the factory's supported options are set as system properties
-            for (String optionKey : transportFactory.supportedOptions())
-                if (System.getProperty(optionKey) != null)
-                    options.put(optionKey, System.getProperty(optionKey));
-
-            transportFactory.setOptions(options);
-        }
-
         private static void errorMsg(String msg, CmdLineOptions options)
         {
             System.err.println(msg);
@@ -633,7 +512,7 @@ public class BulkLoader
             options.addOption(null, NOPROGRESS_OPTION,   "don't display progress");
             options.addOption("i",  IGNORE_NODES_OPTION, "NODES", "don't stream to this (comma
separated) list of nodes");
             options.addOption("d",  INITIAL_HOST_ADDRESS_OPTION, "initial hosts", "Required.
try to connect to these hosts (comma separated) initially for ring information");
-            options.addOption("p",  RPC_PORT_OPTION, "rpc port", "port used for rpc (default
9160)");
+            options.addOption("p",  NATIVE_PORT_OPTION, "rpc port", "port used for native
connection (default 9042)");
             options.addOption("t",  THROTTLE_MBITS, "throttle", "throttle speed in Mbits
(default unlimited)");
             options.addOption("u",  USER_OPTION, "username", "username for cassandra authentication");
             options.addOption("pw", PASSWD_OPTION, "password", "password for cassandra authentication");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
new file mode 100644
index 0000000..1ef686c
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.net.InetAddress;
+import java.util.*;
+
+import com.datastax.driver.core.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.CellNames;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.schema.LegacySchemaTables;
+
+public class NativeSSTableLoaderClient extends SSTableLoader.Client
+{
+    protected final Map<String, CFMetaData> tables;
+    private final Collection<InetAddress> hosts;
+    private final int port;
+    private final String username;
+    private final String password;
+    private final SSLOptions sslOptions;
+
+    public NativeSSTableLoaderClient(Collection<InetAddress> hosts, int port, String
username, String password, SSLOptions sslOptions)
+    {
+        super();
+        this.tables = new HashMap<>();
+        this.hosts = hosts;
+        this.port = port;
+        this.username = username;
+        this.password = password;
+        this.sslOptions = sslOptions;
+    }
+
+    public void init(String keyspace)
+    {
+        Cluster.Builder builder = Cluster.builder().addContactPoints(hosts).withPort(port);
+        if (sslOptions != null)
+            builder.withSSL(sslOptions);
+        if (username != null && password != null)
+            builder = builder.withCredentials(username, password);
+
+        try (Cluster cluster = builder.build())
+        {
+            Session session = cluster.connect();
+            Metadata metadata = cluster.getMetadata();
+
+            setPartitioner(metadata.getPartitioner());
+
+            Set<TokenRange> tokenRanges = metadata.getTokenRanges();
+
+            Token.TokenFactory tokenFactory = getPartitioner().getTokenFactory();
+
+            for (TokenRange tokenRange : tokenRanges)
+            {
+                Set<Host> endpoints = metadata.getReplicas(keyspace, tokenRange);
+                Range<Token> range = new Range<>(tokenFactory.fromString(tokenRange.getStart().getValue().toString()),
+                                                 tokenFactory.fromString(tokenRange.getEnd().getValue().toString()));
+                for (Host endpoint : endpoints)
+                    addRangeForEndpoint(range, endpoint.getAddress());
+            }
+
+            tables.putAll(fetchTablesMetadata(keyspace, session));
+        }
+    }
+
+    public CFMetaData getTableMetadata(String tableName)
+    {
+        return tables.get(tableName);
+    }
+
+    @Override
+    public void setTableMetadata(CFMetaData cfm)
+    {
+        tables.put(cfm.cfName, cfm);
+    }
+
+    private static Map<String, CFMetaData> fetchTablesMetadata(String keyspace, Session
session)
+    {
+        Map<String, CFMetaData> tables = new HashMap<>();
+
+        String query = String.format("SELECT columnfamily_name, cf_id, type, comparator,
subcomparator FROM %s.%s WHERE keyspace_name = '%s'",
+                                     SystemKeyspace.NAME,
+                                     LegacySchemaTables.COLUMNFAMILIES,
+                                     keyspace);
+
+        for (Row row : session.execute(query))
+        {
+            String name = row.getString("columnfamily_name");
+            UUID id = row.getUUID("cf_id");
+            ColumnFamilyType type = ColumnFamilyType.valueOf(row.getString("type"));
+            AbstractType rawComparator = TypeParser.parse(row.getString("comparator"));
+            AbstractType subComparator = row.isNull("subcomparator")
+                                       ? null
+                                       : TypeParser.parse(row.getString("subcomparator"));
+            boolean isDense = row.getBool("is_dense");
+            CellNameType comparator = CellNames.fromAbstractType(CFMetaData.makeRawAbstractType(rawComparator,
subComparator),
+                                                                 isDense);
+
+            tables.put(name, new CFMetaData(keyspace, name, type, comparator, id));
+        }
+
+        return tables;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/test/pig/org/apache/cassandra/pig/CqlTableTest.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/CqlTableTest.java b/test/pig/org/apache/cassandra/pig/CqlTableTest.java
index 26f9f68..72fdd5a 100644
--- a/test/pig/org/apache/cassandra/pig/CqlTableTest.java
+++ b/test/pig/org/apache/cassandra/pig/CqlTableTest.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.thrift.TException;
 import org.junit.Assert;
@@ -70,6 +69,11 @@ public class CqlTableTest extends PigTestBase
             "UPDATE collectiontable SET n['key2'] = 'value2' WHERE m = 'book2';",
             "UPDATE collectiontable SET n['key3'] = 'value3' WHERE m = 'book3';",
             "UPDATE collectiontable SET n['key4'] = 'value4' WHERE m = 'book4';",
+            "CREATE TABLE nulltable(m text PRIMARY KEY, n map<text, text>);",
+            "UPDATE nulltable SET n['key1'] = 'value1' WHERE m = 'book1';",
+            "UPDATE nulltable SET n['key2'] = 'value2' WHERE m = 'book2';",
+            "UPDATE nulltable SET n['key3'] = 'value3' WHERE m = 'book3';",
+            "UPDATE nulltable SET n['key4'] = 'value4' WHERE m = 'book4';",
     };
 
     @BeforeClass
@@ -229,65 +233,32 @@ public class CqlTableTest extends PigTestBase
     }
 
     @Test
-    public void testCassandraStorageSchema() throws IOException
+    public void testCqlNativeStorageNullTuples() throws IOException
     {
-        //results: (key1,{((111,),),((111,column1),100),((111,column2),10.1)})
-        pig.registerQuery("rows = LOAD 'cassandra://cql3ks/cqltable?" + defaultParameters
+ "' USING CassandraStorage();");
-
-        //schema: {key: chararray,columns: {(name: (),value: bytearray)}}
-        Iterator<Tuple> it = pig.openIterator("rows");
-        if (it.hasNext()) {
-            Tuple t = it.next();
-            String rowKey =  t.get(0).toString();
-            Assert.assertEquals(rowKey, "key1");
-            DataBag columns = (DataBag) t.get(1);
-            Iterator<Tuple> iter = columns.iterator();
-            int i = 0;
-            while (iter.hasNext())
-            {
-                i++;
-                Tuple column = iter.next();
-                if (i==1)
-                {
-                    Assert.assertEquals(((Tuple) column.get(0)).get(0), 111);
-                    Assert.assertEquals(((Tuple) column.get(0)).get(1), "");
-                    Assert.assertEquals(column.get(1).toString(), "");
-                }
-                if (i==2)
-                {
-                    Assert.assertEquals(((Tuple) column.get(0)).get(0), 111);
-                    Assert.assertEquals(((Tuple) column.get(0)).get(1), "column1");
-                    Assert.assertEquals(column.get(1), 100);
-                }
-                if (i==3)
-                {
-                    Assert.assertEquals(((Tuple) column.get(0)).get(0), 111);
-                    Assert.assertEquals(((Tuple) column.get(0)).get(1), "column2");
-                    Assert.assertEquals(column.get(1), 10.1f);
-                }
-            }
-            Assert.assertEquals(3, columns.size());
-        }
-        else
-        {
-            Assert.fail("Can't fetch any data");
-        }
+        //input_cql=select * from collectiontable where token(m) > ? and token(m) <=
?
+        NullTupleTest("nulltable= LOAD 'cql://cql3ks/collectiontable?" + defaultParameters
+ nativeParameters + "&input_cql=select%20*%20from%20nulltable%20where%20token(m)%20%3E%20%3F%20and%20token(m)%20%3C%3D%20%3F'
USING CqlNativeStorage();");
+    }
 
-        //results: (key1,(column1,100),(column2,10.1))
-        pig.registerQuery("compact_rows = LOAD 'cassandra://cql3ks/compactcqltable?" + defaultParameters
+ "' USING CassandraStorage();");
+    private void NullTupleTest(String initialQuery) throws IOException
+    {
+        pig.setBatchOn();
+        pig.registerQuery(initialQuery);
+        pig.registerQuery("recs= FOREACH nulltable GENERATE TOTUPLE(TOTUPLE('m', m) ), TOTUPLE(TOTUPLE('map',
TOTUPLE('m', null), TOTUPLE('n', null)));");
+        pig.registerQuery("STORE recs INTO 'cql://cql3ks/nulltable?" + defaultParameters
+ nativeParameters + "&output_query=update+cql3ks.nulltable+set+n+%3D+%3F' USING CqlNativeStorage();");
+        pig.executeBatch();
 
-        //schema: {key: chararray,column1: (name: chararray,value: int),column2: (name: chararray,value:
float)}
-        it = pig.openIterator("compact_rows");
+        pig.registerQuery("result= LOAD 'cql://cql3ks/nulltable?" + defaultParameters + nativeParameters
+ "&input_cql=select%20*%20from%20nulltable%20where%20token(m)%20%3E%20%3F%20and%20token(m)%20%3C%3D%20%3F'
USING CqlNativeStorage();");
+        Iterator<Tuple> it = pig.openIterator("result");
         if (it.hasNext()) {
             Tuple t = it.next();
-            String rowKey =  t.get(0).toString();
-            Assert.assertEquals(rowKey, "key1");
-            Tuple column = (Tuple) t.get(1);
-            Assert.assertEquals(column.get(0), "column1");
-            Assert.assertEquals(column.get(1), 100);
-            column = (Tuple) t.get(2);
-            Assert.assertEquals(column.get(0), "column2");
-            Assert.assertEquals(column.get(1), 10.1f);
+            Tuple t1 = (Tuple) t.get(1);
+            Assert.assertEquals(t1.size(), 2);
+            Tuple element1 = (Tuple) t1.get(0);
+            Tuple element2 = (Tuple) t1.get(1);
+            Assert.assertEquals(element1.get(0), "m");
+            Assert.assertEquals(element1.get(1), "");
+            Assert.assertEquals(element2.get(0), "n");
+            Assert.assertEquals(element2.get(1), "");
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index 850f46d..6525527 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -92,16 +92,19 @@ public class CQLSSTableWriterTest
 
         SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
         {
+            private String keyspace;
+
             public void init(String keyspace)
             {
+                this.keyspace = keyspace;
                 for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace"))
                     addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
                 setPartitioner(StorageService.getPartitioner());
             }
 
-            public CFMetaData getCFMetaData(String keyspace, String cfName)
+            public CFMetaData getTableMetadata(String tableName)
             {
-                return Schema.instance.getCFMetaData(keyspace, cfName);
+                return Schema.instance.getCFMetaData(keyspace, tableName);
             }
         }, new OutputHandler.SystemOutput(false, false));
 
@@ -251,16 +254,19 @@ public class CQLSSTableWriterTest
 
         SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
         {
+            private String keyspace;
+
             public void init(String keyspace)
             {
+                this.keyspace = keyspace;
                 for (Range<Token> range : StorageService.instance.getLocalRanges(KS))
                     addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
                 setPartitioner(StorageService.getPartitioner());
             }
 
-            public CFMetaData getCFMetaData(String keyspace, String cfName)
+            public CFMetaData getTableMetadata(String tableName)
             {
-                return Schema.instance.getCFMetaData(keyspace, cfName);
+                return Schema.instance.getCFMetaData(keyspace, tableName);
             }
         }, new OutputHandler.SystemOutput(false, false));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index b245994..4a51fbd 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -83,16 +83,19 @@ public class SSTableLoaderTest
 
         SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
         {
+            private String keyspace;
+
             public void init(String keyspace)
             {
+                this.keyspace = keyspace;
                 for (Range<Token> range : StorageService.instance.getLocalRanges(KEYSPACE1))
                     addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
                 setPartitioner(StorageService.getPartitioner());
             }
 
-            public CFMetaData getCFMetaData(String keyspace, String cfName)
+            public CFMetaData getTableMetadata(String tableName)
             {
-                return Schema.instance.getCFMetaData(keyspace, cfName);
+                return Schema.instance.getCFMetaData(keyspace, tableName);
             }
         }, new OutputHandler.SystemOutput(false, false));
 


Mime
View raw message