cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ifesdj...@apache.org
Subject [5/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11
Date Thu, 29 Jun 2017 16:05:33 GMT
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a2074b89
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a2074b89
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a2074b89

Branch: refs/heads/cassandra-3.11
Commit: a2074b890c32f1a52c4852549db92444a27ef4a7
Parents: 9562b9b a1baead
Author: Alex Petrov <oleksandr.petrov@gmail.com>
Authored: Thu Jun 29 18:02:27 2017 +0200
Committer: Alex Petrov <oleksandr.petrov@gmail.com>
Committed: Thu Jun 29 18:02:27 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/io/sstable/CQLSSTableWriter.java  | 61 +++++++++++++-------
 .../io/sstable/CQLSSTableWriterTest.java        | 51 +++++++++++++++-
 3 files changed, 90 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2074b89/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 88aa1ef,c5179e7..bab91e7
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -5,45 -4,7 +5,46 @@@ Merged from 3.0
   * Nodetool listsnapshots output is missing a newline, if there are no snapshots (CASSANDRA-13568)
  
  
 -3.0.14
 +3.11.0
++ * Allow native function calls in CQLSSTableWriter (CASSANDRA-12606)
 + * Replace string comparison with regex/number checks in MessagingService test (CASSANDRA-13216)
 + * Fix formatting of duration columns in CQLSH (CASSANDRA-13549)
 + * Fix the problem with duplicated rows when using paging with SASI (CASSANDRA-13302)
 + * Allow CONTAINS statements filtering on the partition key and it’s parts (CASSANDRA-13275)
 + * Fall back to even ranges calculation in clusters with vnodes when tokens are distributed
unevenly (CASSANDRA-13229)
 + * Fix duration type validation to prevent overflow (CASSANDRA-13218)
 + * Forbid unsupported creation of SASI indexes over partition key columns (CASSANDRA-13228)
 + * Reject multiple values for a key in CQL grammar. (CASSANDRA-13369)
 + * UDA fails without input rows (CASSANDRA-13399)
 + * Fix compaction-stress by using daemonInitialization (CASSANDRA-13188)
 + * V5 protocol flags decoding broken (CASSANDRA-13443)
 + * Use write lock not read lock for removing sstables from compaction strategies. (CASSANDRA-13422)
 + * Use corePoolSize equal to maxPoolSize in JMXEnabledThreadPoolExecutors (CASSANDRA-13329)
 + * Avoid rebuilding SASI indexes containing no values (CASSANDRA-12962)
 + * Add charset to Analyser input stream (CASSANDRA-13151)
 + * Fix testLimitSSTables flake caused by concurrent flush (CASSANDRA-12820)
 + * cdc column addition strikes again (CASSANDRA-13382)
 + * Fix static column indexes (CASSANDRA-13277)
 + * DataOutputBuffer.asNewBuffer broken (CASSANDRA-13298)
 + * unittest CipherFactoryTest failed on MacOS (CASSANDRA-13370)
 + * Forbid SELECT restrictions and CREATE INDEX over non-frozen UDT columns (CASSANDRA-13247)
 + * Default logging we ship will incorrectly print "?:?" for "%F:%L" pattern (CASSANDRA-13317)
 + * Possible AssertionError in UnfilteredRowIteratorWithLowerBound (CASSANDRA-13366)
 + * Support unaligned memory access for AArch64 (CASSANDRA-13326)
 + * Improve SASI range iterator efficiency on intersection with an empty range (CASSANDRA-12915).
 + * Fix equality comparisons of columns using the duration type (CASSANDRA-13174)
 + * Obfuscate password in stress-graphs (CASSANDRA-12233)
 + * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034)
 + * nodetool stopdaemon errors out (CASSANDRA-13030)
 + * Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954)
 + * Fix primary index calculation for SASI (CASSANDRA-12910)
 + * More fixes to the TokenAllocator (CASSANDRA-12990)
 + * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983)
 + * Address message coalescing regression (CASSANDRA-12676)
 + * Delete illegal character from StandardTokenizerImpl.jflex (CASSANDRA-13417)
 + * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307)
 + * Tracing payload not passed from QueryMessage to tracing session (CASSANDRA-12835)
 +Merged from 3.0:
   * Ensure int overflow doesn't occur when calculating large partition warning size (CASSANDRA-13172)
   * Ensure consistent view of partition columns between coordinator and replica in ColumnFilter
(CASSANDRA-13004)
   * Failed unregistering mbean during drop keyspace (CASSANDRA-13346)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2074b89/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index a195235,39f7339..9e42101
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@@ -21,43 -21,30 +21,49 @@@ import java.io.Closeable
  import java.io.File;
  import java.io.IOException;
  import java.nio.ByteBuffer;
 -import java.util.*;
 -
 -import org.apache.cassandra.config.*;
 -import org.apache.cassandra.cql3.*;
 -import org.apache.cassandra.cql3.statements.CFStatement;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.SortedSet;
 +import java.util.stream.Collectors;
 +
 +import com.datastax.driver.core.ProtocolVersion;
 +import com.datastax.driver.core.TypeCodec;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.config.Schema;
++import org.apache.cassandra.config.SchemaConstants;
 +import org.apache.cassandra.cql3.ColumnSpecification;
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.cql3.QueryProcessor;
 +import org.apache.cassandra.cql3.UpdateParameters;
 +import org.apache.cassandra.cql3.functions.UDHelper;
  import org.apache.cassandra.cql3.statements.CreateTableStatement;
 +import org.apache.cassandra.cql3.statements.CreateTypeStatement;
 +import org.apache.cassandra.cql3.statements.ModificationStatement;
  import org.apache.cassandra.cql3.statements.ParsedStatement;
  import org.apache.cassandra.cql3.statements.UpdateStatement;
  import org.apache.cassandra.db.Clustering;
  import org.apache.cassandra.db.DecoratedKey;
+ import org.apache.cassandra.db.SystemKeyspace;
 -import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.marshal.UserType;
  import org.apache.cassandra.db.partitions.Partition;
  import org.apache.cassandra.dht.IPartitioner;
  import org.apache.cassandra.dht.Murmur3Partitioner;
  import org.apache.cassandra.exceptions.InvalidRequestException;
 -import org.apache.cassandra.exceptions.RequestValidationException;
 +import org.apache.cassandra.exceptions.SyntaxException;
  import org.apache.cassandra.io.sstable.format.SSTableFormat;
++import org.apache.cassandra.schema.Functions;
  import org.apache.cassandra.schema.KeyspaceMetadata;
  import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.schema.SchemaKeyspace;
+ import org.apache.cassandra.schema.Tables;
  import org.apache.cassandra.schema.Types;
++import org.apache.cassandra.schema.Views;
  import org.apache.cassandra.service.ClientState;
 +import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.Pair;
  
  /**
@@@ -495,94 -497,54 +501,105 @@@ public class CQLSSTableWriter implement
              return this;
          }
  
 -        private static CFMetaData getTableMetadata(String schema)
 +        @SuppressWarnings("resource")
 +        public CQLSSTableWriter build()
          {
 -            CFStatement parsed = (CFStatement)QueryProcessor.parseStatement(schema);
 -            // tables with UDTs are currently not supported by CQLSSTableWrite, so we just
use Types.none(), for now
 -            // see CASSANDRA-10624 for more details
 -            CreateTableStatement statement = (CreateTableStatement) ((CreateTableStatement.RawStatement)
parsed).prepare(Types.none()).statement;
 -            statement.validate(ClientState.forInternalCalls());
 -            return statement.getCFMetaData();
 -        }
 +            if (directory == null)
 +                throw new IllegalStateException("No ouptut directory specified, you should
provide a directory with inDirectory()");
 +            if (schemaStatement == null)
 +                throw new IllegalStateException("Missing schema, you should provide the
schema for the SSTable to create with forTable()");
 +            if (insertStatement == null)
 +                throw new IllegalStateException("No insert statement specified, you should
provide an insert statement through using()");
  
 -        private static <T extends CQLStatement> Pair<T, List<ColumnSpecification>>
getStatement(String query, Class<T> klass, String type)
 -        {
 -            try
 +            synchronized (CQLSSTableWriter.class)
              {
 -                ClientState state = ClientState.forInternalCalls();
 -                ParsedStatement.Prepared prepared = QueryProcessor.getStatement(query, state);
 -                CQLStatement stmt = prepared.statement;
 -                stmt.validate(state);
++                if (Schema.instance.getKSMetaData(SchemaConstants.SCHEMA_KEYSPACE_NAME)
== null)
++                    Schema.instance.load(SchemaKeyspace.metadata());
++                if (Schema.instance.getKSMetaData(SchemaConstants.SYSTEM_KEYSPACE_NAME)
== null)
++                    Schema.instance.load(SystemKeyspace.metadata());
+ 
 -                if (!stmt.getClass().equals(klass))
 -                    throw new IllegalArgumentException("Invalid query, must be a " + type
+ " statement");
 +                String keyspace = schemaStatement.keyspace();
  
 -                return Pair.create(klass.cast(stmt), prepared.boundNames);
 -            }
 -            catch (RequestValidationException e)
 -            {
 -                throw new IllegalArgumentException(e.getMessage(), e);
 +                if (Schema.instance.getKSMetaData(keyspace) == null)
-                     Schema.instance.load(KeyspaceMetadata.create(keyspace, KeyspaceParams.simple(1)));
++                {
++                    Schema.instance.load(KeyspaceMetadata.create(keyspace,
++                                                                 KeyspaceParams.simple(1),
++                                                                 Tables.none(),
++                                                                 Views.none(),
++                                                                 Types.none(),
++                                                                 Functions.none()));
++                }
++
++
++                KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
++                CFMetaData cfMetaData = ksm.tables.getNullable(schemaStatement.columnFamily());
++                if (cfMetaData == null)
++                {
++                    Types types = createTypes(keyspace);
++                    cfMetaData = createTable(types);
++
++                    Schema.instance.load(cfMetaData);
++                    Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(cfMetaData)).withSwapped(types));
++                }
 +
-                 createTypes(keyspace);
-                 CFMetaData cfMetaData = createTable(keyspace);
 +                Pair<UpdateStatement, List<ColumnSpecification>> preparedInsert
= prepareInsert();
 +
 +                AbstractSSTableSimpleWriter writer = sorted
 +                                                     ? new SSTableSimpleWriter(directory,
cfMetaData, preparedInsert.left.updatedColumns())
 +                                                     : new SSTableSimpleUnsortedWriter(directory,
cfMetaData, preparedInsert.left.updatedColumns(), bufferSizeInMB);
 +
 +                if (formatType != null)
 +                    writer.setSSTableFormatType(formatType);
 +
 +                return new CQLSSTableWriter(writer, preparedInsert.left, preparedInsert.right);
              }
          }
  
-         private void createTypes(String keyspace)
 -        @SuppressWarnings("resource")
 -        public CQLSSTableWriter build()
++        private Types createTypes(String keyspace)
          {
-             KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
 -            if (directory == null)
 -                throw new IllegalStateException("No ouptut directory specified, you should
provide a directory with inDirectory()");
 -            if (schema == null)
 -                throw new IllegalStateException("Missing schema, you should provide the
schema for the SSTable to create with forTable()");
 -            if (insert == null)
 -                throw new IllegalStateException("No insert statement specified, you should
provide an insert statement through using()");
 +            Types.RawBuilder builder = Types.rawBuilder(keyspace);
 +            for (CreateTypeStatement st : typeStatements)
 +                st.addToRawBuilder(builder);
- 
-             ksm = ksm.withSwapped(builder.build());
-             Schema.instance.setKeyspaceMetadata(ksm);
++            return builder.build();
 +        }
+ 
 -            AbstractSSTableSimpleWriter writer = sorted
 -                                               ? new SSTableSimpleWriter(directory, schema,
insert.updatedColumns())
 -                                               : new SSTableSimpleUnsortedWriter(directory,
schema, insert.updatedColumns(), bufferSizeInMB);
 +        /**
 +         * Creates the table according to schema statement
 +         *
-          * @param keyspace name of the keyspace where table should be created
++         * @param types types this table should be created with
 +         */
-         private CFMetaData createTable(String keyspace)
++        private CFMetaData createTable(Types types)
 +        {
-             KeyspaceMetadata ksm = Schema.instance.getKSMetaData(keyspace);
++            CreateTableStatement statement = (CreateTableStatement) schemaStatement.prepare(types).statement;
++            statement.validate(ClientState.forInternalCalls());
  
-             CFMetaData cfMetaData = ksm.tables.getNullable(schemaStatement.columnFamily());
-             if (cfMetaData == null)
-             {
-                 CreateTableStatement statement = (CreateTableStatement) schemaStatement.prepare(ksm.types).statement;
-                 statement.validate(ClientState.forInternalCalls());
- 
-                 cfMetaData = statement.getCFMetaData();
- 
-                 Schema.instance.load(cfMetaData);
-                 Schema.instance.setKeyspaceMetadata(ksm.withSwapped(ksm.tables.with(cfMetaData)));
-             }
 -            if (formatType != null)
 -                writer.setSSTableFormatType(formatType);
++            CFMetaData cfMetaData = statement.getCFMetaData();
 +
 +            if (partitioner != null)
 +                return cfMetaData.copy(partitioner);
 +            else
 +                return cfMetaData;
 +        }
 +
 +        /**
 +         * Prepares insert statement for writing data to SSTable
 +         *
 +         * @return prepared Insert statement and it's bound names
 +         */
 +        private Pair<UpdateStatement, List<ColumnSpecification>> prepareInsert()
 +        {
 +            ParsedStatement.Prepared cqlStatement = insertStatement.prepare();
 +            UpdateStatement insert = (UpdateStatement) cqlStatement.statement;
 +            insert.validate(ClientState.forInternalCalls());
 +
 +            if (insert.hasConditions())
 +                throw new IllegalArgumentException("Conditional statements are not supported");
 +            if (insert.isCounter())
 +                throw new IllegalArgumentException("Counter update statements are not supported");
 +            if (cqlStatement.boundNames.isEmpty())
 +                throw new IllegalArgumentException("Provided insert statement has no bind
variables");
  
 -            return new CQLSSTableWriter(writer, insert, boundNames);
 +            return Pair.create(insert, cqlStatement.boundNames);
          }
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2074b89/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index ac7f4ad,7d79036..a400612
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@@ -222,326 -225,8 +222,326 @@@ public class CQLSSTableWriterTes
  
      }
  
 +
 +
 +    private static final int NUMBER_WRITES_IN_RUNNABLE = 10;
 +    private class WriterThread extends Thread
 +    {
 +        private final File dataDir;
 +        private final int id;
 +        public volatile Exception exception;
 +
 +        public WriterThread(File dataDir, int id)
 +        {
 +            this.dataDir = dataDir;
 +            this.id = id;
 +        }
 +
 +        @Override
 +        public void run()
 +        {
 +            String schema = "CREATE TABLE cql_keyspace2.table2 ("
 +                    + "  k int,"
 +                    + "  v int,"
 +                    + "  PRIMARY KEY (k, v)"
 +                    + ")";
 +            String insert = "INSERT INTO cql_keyspace2.table2 (k, v) VALUES (?, ?)";
 +            CQLSSTableWriter writer = CQLSSTableWriter.builder()
 +                    .inDirectory(dataDir)
 +                    .forTable(schema)
 +                    .using(insert).build();
 +
 +            try
 +            {
 +                for (int i = 0; i < NUMBER_WRITES_IN_RUNNABLE; i++)
 +                {
 +                    writer.addRow(id, i);
 +                }
 +                writer.close();
 +            }
 +            catch (Exception e)
 +            {
 +                exception = e;
 +            }
 +        }
 +    }
 +
 +    @Test
 +    public void testConcurrentWriters() throws Exception
 +    {
 +        final String KS = "cql_keyspace2";
 +        final String TABLE = "table2";
 +
 +        File tempdir = Files.createTempDir();
 +        File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator
+ TABLE);
 +        assert dataDir.mkdirs();
 +
 +        WriterThread[] threads = new WriterThread[5];
 +        for (int i = 0; i < threads.length; i++)
 +        {
 +            WriterThread thread = new WriterThread(dataDir, i);
 +            threads[i] = thread;
 +            thread.start();
 +        }
 +
 +        for (WriterThread thread : threads)
 +        {
 +            thread.join();
 +            assert !thread.isAlive() : "Thread should be dead by now";
 +            if (thread.exception != null)
 +            {
 +                throw thread.exception;
 +            }
 +        }
 +
 +        loadSSTables(dataDir, KS);
 +
 +        UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace2.table2;");
 +        assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size());
 +    }
 +
 +    @Test
 +    @SuppressWarnings("unchecked")
 +    public void testWritesWithUdts() throws Exception
 +    {
 +        final String KS = "cql_keyspace3";
 +        final String TABLE = "table3";
 +
 +        final String schema = "CREATE TABLE " + KS + "." + TABLE + " ("
 +                              + "  k int,"
 +                              + "  v1 list<frozen<tuple2>>,"
 +                              + "  v2 frozen<tuple3>,"
 +                              + "  PRIMARY KEY (k)"
 +                              + ")";
 +
 +        File tempdir = Files.createTempDir();
 +        File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator
+ TABLE);
 +        assert dataDir.mkdirs();
 +
 +        CQLSSTableWriter writer = CQLSSTableWriter.builder()
 +                                                  .inDirectory(dataDir)
 +                                                  .withType("CREATE TYPE " + KS + ".tuple2
(a int, b int)")
 +                                                  .withType("CREATE TYPE " + KS + ".tuple3
(a int, b int, c int)")
 +                                                  .forTable(schema)
 +                                                  .using("INSERT INTO " + KS + "." + TABLE
+ " (k, v1, v2) " +
 +                                                         "VALUES (?, ?, ?)").build();
 +
 +        UserType tuple2Type = writer.getUDType("tuple2");
 +        UserType tuple3Type = writer.getUDType("tuple3");
 +        for (int i = 0; i < 100; i++)
 +        {
 +            writer.addRow(i,
 +                          ImmutableList.builder()
 +                                       .add(tuple2Type.newValue()
 +                                                      .setInt("a", i * 10)
 +                                                      .setInt("b", i * 20))
 +                                       .add(tuple2Type.newValue()
 +                                                      .setInt("a", i * 30)
 +                                                      .setInt("b", i * 40))
 +                                       .build(),
 +                          tuple3Type.newValue()
 +                                    .setInt("a", i * 100)
 +                                    .setInt("b", i * 200)
 +                                    .setInt("c", i * 300));
 +        }
 +
 +        writer.close();
 +        loadSSTables(dataDir, KS);
 +
 +        UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS
+ "." + TABLE);
 +        TypeCodec collectionCodec = UDHelper.codecFor(DataType.CollectionType.frozenList(tuple2Type));
 +        TypeCodec tuple3Codec = UDHelper.codecFor(tuple3Type);
 +
 +        assertEquals(resultSet.size(), 100);
 +        int cnt = 0;
 +        for (UntypedResultSet.Row row: resultSet) {
 +            assertEquals(cnt,
 +                         row.getInt("k"));
 +            List<UDTValue> values = (List<UDTValue>) collectionCodec.deserialize(row.getBytes("v1"),
 +                                                                                 ProtocolVersion.NEWEST_SUPPORTED);
 +            assertEquals(values.get(0).getInt("a"), cnt * 10);
 +            assertEquals(values.get(0).getInt("b"), cnt * 20);
 +            assertEquals(values.get(1).getInt("a"), cnt * 30);
 +            assertEquals(values.get(1).getInt("b"), cnt * 40);
 +
 +            UDTValue v2 = (UDTValue) tuple3Codec.deserialize(row.getBytes("v2"), ProtocolVersion.NEWEST_SUPPORTED);
 +
 +            assertEquals(v2.getInt("a"), cnt * 100);
 +            assertEquals(v2.getInt("b"), cnt * 200);
 +            assertEquals(v2.getInt("c"), cnt * 300);
 +            cnt++;
 +        }
 +    }
 +
 +    @Test
 +    @SuppressWarnings("unchecked")
 +    public void testWritesWithDependentUdts() throws Exception
 +    {
 +        final String KS = "cql_keyspace4";
 +        final String TABLE = "table4";
 +
 +        final String schema = "CREATE TABLE " + KS + "." + TABLE + " ("
 +                              + "  k int,"
 +                              + "  v1 frozen<nested_tuple>,"
 +                              + "  PRIMARY KEY (k)"
 +                              + ")";
 +
 +        File tempdir = Files.createTempDir();
 +        File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator
+ TABLE);
 +        assert dataDir.mkdirs();
 +
 +        CQLSSTableWriter writer = CQLSSTableWriter.builder()
 +                                                  .inDirectory(dataDir)
 +                                                  .withType("CREATE TYPE " + KS + ".nested_tuple
(c int, tpl frozen<tuple2>)")
 +                                                  .withType("CREATE TYPE " + KS + ".tuple2
(a int, b int)")
 +                                                  .forTable(schema)
 +                                                  .using("INSERT INTO " + KS + "." + TABLE
+ " (k, v1) " +
 +                                                         "VALUES (?, ?)")
 +                                                  .build();
 +
 +        UserType tuple2Type = writer.getUDType("tuple2");
 +        UserType nestedTuple = writer.getUDType("nested_tuple");
 +        TypeCodec tuple2Codec = UDHelper.codecFor(tuple2Type);
 +        TypeCodec nestedTupleCodec = UDHelper.codecFor(nestedTuple);
 +
 +        for (int i = 0; i < 100; i++)
 +        {
 +            writer.addRow(i,
 +                          nestedTuple.newValue()
 +                                     .setInt("c", i * 100)
 +                                     .set("tpl",
 +                                          tuple2Type.newValue()
 +                                                    .setInt("a", i * 200)
 +                                                    .setInt("b", i * 300),
 +                                          tuple2Codec));
 +        }
 +
 +        writer.close();
 +        loadSSTables(dataDir, KS);
 +
 +        UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS
+ "." + TABLE);
 +
 +        assertEquals(resultSet.size(), 100);
 +        int cnt = 0;
 +        for (UntypedResultSet.Row row: resultSet) {
 +            assertEquals(cnt,
 +                         row.getInt("k"));
 +            UDTValue nestedTpl = (UDTValue) nestedTupleCodec.deserialize(row.getBytes("v1"),
 +                                                                         ProtocolVersion.NEWEST_SUPPORTED);
 +            assertEquals(nestedTpl.getInt("c"), cnt * 100);
 +            UDTValue tpl = nestedTpl.getUDTValue("tpl");
 +            assertEquals(tpl.getInt("a"), cnt * 200);
 +            assertEquals(tpl.getInt("b"), cnt * 300);
 +
 +            cnt++;
 +        }
 +    }
 +
 +    @Test
 +    public void testUnsetValues() throws Exception
 +    {
 +        final String KS = "cql_keyspace5";
 +        final String TABLE = "table5";
 +
 +        final String schema = "CREATE TABLE " + KS + "." + TABLE + " ("
 +                              + "  k int,"
 +                              + "  c1 int,"
 +                              + "  c2 int,"
 +                              + "  v text,"
 +                              + "  PRIMARY KEY (k, c1, c2)"
 +                              + ")";
 +
 +        File tempdir = Files.createTempDir();
 +        File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator
+ TABLE);
 +        assert dataDir.mkdirs();
 +
 +        CQLSSTableWriter writer = CQLSSTableWriter.builder()
 +                                                  .inDirectory(dataDir)
 +                                                  .forTable(schema)
 +                                                  .using("INSERT INTO " + KS + "." + TABLE
+ " (k, c1, c2, v) " +
 +                                                         "VALUES (?, ?, ?, ?)")
 +                                                  .build();
 +
 +        try
 +        {
 +            writer.addRow(1, 1, 1);
 +            fail("Passing less arguments then expected in prepared statement should not
work.");
 +        }
 +        catch (InvalidRequestException e)
 +        {
 +            assertEquals("Invalid number of arguments, expecting 4 values but got 3",
 +                         e.getMessage());
 +        }
 +
 +        try
 +        {
 +            writer.addRow(1, 1, CQLSSTableWriter.UNSET_VALUE, "1");
 +            fail("Unset values should not work with clustering columns.");
 +        }
 +        catch (InvalidRequestException e)
 +        {
 +            assertEquals("Invalid unset value for column c2",
 +                         e.getMessage());
 +        }
 +
 +        try
 +        {
 +            writer.addRow(ImmutableMap.<String, Object>builder().put("k", 1).put("c1",
1).put("v", CQLSSTableWriter.UNSET_VALUE).build());
 +            fail("Unset or null clustering columns should not be allowed.");
 +        }
 +        catch (InvalidRequestException e)
 +        {
 +            assertEquals("Invalid null value in condition for column c2",
 +                         e.getMessage());
 +        }
 +
 +        writer.addRow(1, 1, 1, CQLSSTableWriter.UNSET_VALUE);
 +        writer.addRow(2, 2, 2, null);
 +        writer.addRow(Arrays.asList(3, 3, 3, CQLSSTableWriter.UNSET_VALUE));
 +        writer.addRow(ImmutableMap.<String, Object>builder()
 +                                  .put("k", 4)
 +                                  .put("c1", 4)
 +                                  .put("c2", 4)
 +                                  .put("v", CQLSSTableWriter.UNSET_VALUE)
 +                                  .build());
 +        writer.addRow(Arrays.asList(3, 3, 3, CQLSSTableWriter.UNSET_VALUE));
 +        writer.addRow(5, 5, 5, "5");
 +
 +        writer.close();
 +        loadSSTables(dataDir, KS);
 +
 +        UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS
+ "." + TABLE);
 +        Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
 +        UntypedResultSet.Row r1 = iter.next();
 +        assertEquals(1, r1.getInt("k"));
 +        assertEquals(1, r1.getInt("c1"));
 +        assertEquals(1, r1.getInt("c2"));
 +        assertEquals(false, r1.has("v"));
 +        UntypedResultSet.Row r2 = iter.next();
 +        assertEquals(2, r2.getInt("k"));
 +        assertEquals(2, r2.getInt("c1"));
 +        assertEquals(2, r2.getInt("c2"));
 +        assertEquals(false, r2.has("v"));
 +        UntypedResultSet.Row r3 = iter.next();
 +        assertEquals(3, r3.getInt("k"));
 +        assertEquals(3, r3.getInt("c1"));
 +        assertEquals(3, r3.getInt("c2"));
 +        assertEquals(false, r3.has("v"));
 +        UntypedResultSet.Row r4 = iter.next();
 +        assertEquals(4, r4.getInt("k"));
 +        assertEquals(4, r4.getInt("c1"));
 +        assertEquals(4, r4.getInt("c2"));
 +        assertEquals(false, r3.has("v"));
 +        UntypedResultSet.Row r5 = iter.next();
 +        assertEquals(5, r5.getInt("k"));
 +        assertEquals(5, r5.getInt("c1"));
 +        assertEquals(5, r5.getInt("c2"));
 +        assertEquals(true, r5.has("v"));
 +        assertEquals("5", r5.getString("v"));
 +    }
 +
      @Test
-     public void testUpdateSatement() throws Exception
+     public void testUpdateStatement() throws Exception
      {
          final String KS = "cql_keyspace6";
          final String TABLE = "table6";
@@@ -589,6 -273,131 +589,55 @@@
          assertFalse(iter.hasNext());
      }
  
+     @Test
+     public void testNativeFunctions() throws Exception
+     {
+         final String KS = "cql_keyspace7";
+         final String TABLE = "table7";
+ 
+         final String schema = "CREATE TABLE " + KS + "." + TABLE + " ("
+                               + "  k int,"
+                               + "  c1 int,"
+                               + "  c2 int,"
+                               + "  v blob,"
+                               + "  PRIMARY KEY (k, c1, c2)"
+                               + ")";
+ 
+         File tempdir = Files.createTempDir();
+         File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator
+ TABLE);
+         assert dataDir.mkdirs();
+ 
+         CQLSSTableWriter writer = CQLSSTableWriter.builder()
+                                                   .inDirectory(dataDir)
+                                                   .forTable(schema)
+                                                   .using("INSERT INTO " + KS + "." + TABLE
+ " (k, c1, c2, v) VALUES (?, ?, ?, textAsBlob(?))")
+                                                   .build();
+ 
+         writer.addRow(1, 2, 3, "abc");
+         writer.addRow(4, 5, 6, "efg");
+ 
+         writer.close();
+         loadSSTables(dataDir, KS);
+ 
+         UntypedResultSet resultSet = QueryProcessor.executeInternal("SELECT * FROM " + KS
+ "." + TABLE);
+         assertEquals(2, resultSet.size());
+ 
+         Iterator<UntypedResultSet.Row> iter = resultSet.iterator();
+         UntypedResultSet.Row r1 = iter.next();
+         assertEquals(1, r1.getInt("k"));
+         assertEquals(2, r1.getInt("c1"));
+         assertEquals(3, r1.getInt("c2"));
+         assertEquals(ByteBufferUtil.bytes("abc"), r1.getBytes("v"));
+ 
+         UntypedResultSet.Row r2 = iter.next();
+         assertEquals(4, r2.getInt("k"));
+         assertEquals(5, r2.getInt("c1"));
+         assertEquals(6, r2.getInt("c2"));
+         assertEquals(ByteBufferUtil.bytes("efg"), r2.getBytes("v"));
+ 
+         assertFalse(iter.hasNext());
+     }
+ 
 -    private static final int NUMBER_WRITES_IN_RUNNABLE = 10;
 -    private class WriterThread extends Thread
 -    {
 -        private final File dataDir;
 -        private final int id;
 -        public volatile Exception exception;
 -
 -        public WriterThread(File dataDir, int id)
 -        {
 -            this.dataDir = dataDir;
 -            this.id = id;
 -        }
 -
 -        @Override
 -        public void run()
 -        {
 -            String schema = "CREATE TABLE cql_keyspace2.table2 ("
 -                    + "  k int,"
 -                    + "  v int,"
 -                    + "  PRIMARY KEY (k, v)"
 -                    + ")";
 -            String insert = "INSERT INTO cql_keyspace2.table2 (k, v) VALUES (?, ?)";
 -            CQLSSTableWriter writer = CQLSSTableWriter.builder()
 -                    .inDirectory(dataDir)
 -                    .forTable(schema)
 -                    .using(insert).build();
 -
 -            try
 -            {
 -                for (int i = 0; i < NUMBER_WRITES_IN_RUNNABLE; i++)
 -                {
 -                    writer.addRow(id, i);
 -                }
 -                writer.close();
 -            }
 -            catch (Exception e)
 -            {
 -                exception = e;
 -            }
 -        }
 -    }
 -
 -    @Test
 -    public void testConcurrentWriters() throws Exception
 -    {
 -        final String KS = "cql_keyspace2";
 -        final String TABLE = "table2";
 -
 -        File tempdir = Files.createTempDir();
 -        File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator
+ TABLE);
 -        assert dataDir.mkdirs();
 -
 -        WriterThread[] threads = new WriterThread[5];
 -        for (int i = 0; i < threads.length; i++)
 -        {
 -            WriterThread thread = new WriterThread(dataDir, i);
 -            threads[i] = thread;
 -            thread.start();
 -        }
 -
 -        for (WriterThread thread : threads)
 -        {
 -            thread.join();
 -            assert !thread.isAlive() : "Thread should be dead by now";
 -            if (thread.exception != null)
 -            {
 -                throw thread.exception;
 -            }
 -        }
 -
 -        loadSSTables(dataDir, KS);
 -
 -        UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace2.table2;");
 -        assertEquals(threads.length * NUMBER_WRITES_IN_RUNNABLE, rs.size());
 -    }
 -
      private static void loadSSTables(File dataDir, String ks) throws ExecutionException,
InterruptedException
      {
          SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message