incubator-cassandra-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Hiller, Dean" <Dean.Hil...@nrel.gov>
Subject Re: Bulk loading into CQL3 Composite Columns
Date Fri, 31 May 2013 15:11:19 GMT
Another option is not having it part of the primary key and using PlayOrm to query but to succeed
and scale, you would need to also use PlayOrm partitions and then you can query in the partition
and sort stuff.

Dean

From: Daniel Morton <daniel@djmorton.com<mailto:daniel@djmorton.com>>
Reply-To: "user@cassandra.apache.org<mailto:user@cassandra.apache.org>" <user@cassandra.apache.org<mailto:user@cassandra.apache.org>>
Date: Friday, May 31, 2013 9:01 AM
To: "user@cassandra.apache.org<mailto:user@cassandra.apache.org>" <user@cassandra.apache.org<mailto:user@cassandra.apache.org>>
Subject: Re: Bulk loading into CQL3 Composite Columns

Hi Keith... Thanks for all your help so far.

I've done some additional testing and I can see no difference between having all the columns
as part of the primary key or having only a subset.  Granted, in my contrived example there
is no benefit to having all the columns in the primary key, but I believe in my real use-case
it makes sense... (If you imagine val1 being a category of data and val2 being an amount,
then I can filter on a value for val1 and get sorted results for val2... I could accomplish
the same thing by adding val1 to the rowkey, but I wanted to ensure my rows are of appropriate
width).

I also tried using the Astyanax library with the Composite handling you suggested and I see
exactly the same results as when I use the CompositeType Builder.

If my composite type has two integers, representing my val1 and val2 and I add two values
to my builder (or to the Astyanax Composite() class), the sstableloader imports the data,
but I get an ArrayIndexOutOfBoundException when selecting from the table and cqlsh actually
appears to loose the connection to the DB... I have to restart cqlsh before I can do anything
further.  The stack trace for the exception Cassandra throws is:

ERROR 09:33:01,130 Error occurred during processing of message.
java.lang.ArrayIndexOutOfBoundsException: 1
        at org.apache.cassandra.cql3.statements.ColumnGroupMap.add(ColumnGroupMap.java:43)
        at org.apache.cassandra.cql3.statements.ColumnGroupMap.access$200(ColumnGroupMap.java:31)
        at org.apache.cassandra.cql3.statements.ColumnGroupMap$Builder.add(ColumnGroupMap.java:128)
        at org.apache.cassandra.cql3.statements.SelectStatement.process(SelectStatement.java:730)
        at org.apache.cassandra.cql3.statements.SelectStatement.processResults(SelectStatement.java:134)
        at org.apache.cassandra.cql3.statements.SelectStatement.execute(SelectStatement.java:128)
        at org.apache.cassandra.cql3.statements.SelectStatement.execute(SelectStatement.java:56)
        at org.apache.cassandra.cql3.QueryProcessor.processStatement(QueryProcessor.java:132)
        at org.apache.cassandra.cql3.QueryProcessor.process(QueryProcessor.java:143)
        at org.apache.cassandra.thrift.CassandraServer.execute_cql3_query(CassandraServer.java:1707)
        at org.apache.cassandra.thrift.Cassandra$Processor$execute_cql3_query.getResult(Cassandra.java:4074)
        at org.apache.cassandra.thrift.Cassandra$Processor$execute_cql3_query.getResult(Cassandra.java:4062)
        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:32)
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:34)
        at org.apache.cassandra.thrift.CustomTThreadPoolServer$WorkerProcess.run(CustomTThreadPoolServer.java:199)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)


However, I have found a way that I can trick it into working... Or so it seems, although it
strikes me as hacky.  If I define my column comparator for the SSTableSimpleUnsortedWriter
as:

final List<AbstractType<?>> compositeTypes = new ArrayList<>();
  compositeTypes.add(IntegerType.instance);
  compositeTypes.add(IntegerType.instance);
  compositeTypes.add(IntegerType.instance);

which adds an extra IntegerType, as I am actually only trying to insert 2 integer values,
and I build my composite for the row as such:

final Composite columnComposite = new Composite();
  columnComposite.setComponent(0, 5, IntegerSerializer.get());
  columnComposite.setComponent(1, 10, IntegerSerializer.get());
    columnComposite.setComponent(2, 20, IntegerSerializer.get()); // Dummy value, I actually
don't want a value with index 2 inserted

The data imports correctly, the value 5 gets stored as val1, 10 gets stored as val2, and 20
appears to be thrown away.


Am I just doing something wonky here, or am I running up against a bug somewhere?  The full
working source is:

package com.exinda.bigdata.cassandra;

import static org.apache.cassandra.utils.ByteBufferUtil.bytes;

import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.CompositeType.Builder;
import org.apache.cassandra.db.marshal.IntegerType;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter;

//Assumes a keyspace called 'bigdata' and a table called 'test' with the following definition:
//  CREATE TABLE test (key TEXT, val1 INT, val2 INT, PRIMARY KEY (key, val1, val2));

public class CassandraLoader {
    public static void main(String[] args) throws Exception {
        final List<AbstractType<?>> compositeTypes = new ArrayList<>();
        compositeTypes.add(IntegerType.instance);
        compositeTypes.add(IntegerType.instance);
        compositeTypes.add(IntegerType.instance);

        final CompositeType compType = CompositeType.getInstance(compositeTypes);

        final SSTableSimpleUnsortedWriter ssTableWriter = new SSTableSimpleUnsortedWriter(
                new File("/tmp/cassandra_bulk/bigdata/test"),
                new Murmur3Partitioner() ,
                "bigdata",
                "test",
                compType,
                null,
                128);

        final Builder builder = new CompositeType.Builder(compType);

        builder.add(bytes(5));
        builder.add(bytes(10));
        builder.add(bytes(20));

        ssTableWriter.newRow(bytes("0|20101201"));
        ssTableWriter.addColumn(
                builder.build(),
                ByteBuffer.allocate(0),
                System.nanoTime()
                );
        ssTableWriter.close();
    }
}



Any thoughts?

Daniel Morton


On Thu, May 30, 2013 at 8:12 PM, Keith Wright <kwright@nanigans.com<mailto:kwright@nanigans.com>>
wrote:
StringSerializer and CompositeSerializer are actually from Astyanax for what's it worth. 
I would recommend you change your table definition so that only val1 is part of the primary
key.  There is no reason to include val2.  Perhaps sending the IndexOutOfBoundsException would
help.

All the StringSerializer is really doing is

ByteBuffer.wrap<http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b14/java/nio/ByteBuffer.java#ByteBuffer.wrap%28byte%5B%5D%29>(obj.getBytes<http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b14/java/lang/String.java#String.getBytes%28java.nio.charset.Charset%29>(charset<http://grepcode.com/file/repo1.maven.org/maven2/com.netflix.astyanax/astyanax/1.56.26/com/netflix/astyanax/serializers/StringSerializer.java#StringSerializer.0charset>))

Using UTF-8 as the charset (see http://grepcode.com/file/repo1.maven.org/maven2/com.netflix.astyanax/astyanax/1.56.26/com/netflix/astyanax/serializers/StringSerializer.java#StringSerializer)

You can see the source for CompositeSerializer here:  http://grepcode.com/file/repo1.maven.org/maven2/com.netflix.astyanax/astyanax/1.56.26/com/netflix/astyanax/serializers/CompositeSerializer.java

Good luck!

From: Daniel Morton <daniel@djmorton.com<mailto:daniel@djmorton.com>>
Reply-To: "user@cassandra.apache.org<mailto:user@cassandra.apache.org>" <user@cassandra.apache.org<mailto:user@cassandra.apache.org>>
Date: Thursday, May 30, 2013 4:33 PM
To: "user@cassandra.apache.org<mailto:user@cassandra.apache.org>" <user@cassandra.apache.org<mailto:user@cassandra.apache.org>>
Subject: Re: Bulk loading into CQL3 Composite Columns

Hi Keith... Thanks for the help.

I'm presently not importing the Hector library (Which is where classes like CompositeSerializer
and StringSerializer come from, yes?), only the cassandra-all maven artifact.  Is the behaviour
of the CompositeSerializer much different than using a Builder from a CompositeType?  When
I saw the error about '20101201' failing to decode, I tried only including the values for
val1 and val2 like:


final List<AbstractType<?>> compositeTypes = new ArrayList<>();
compositeTypes.add(IntegerType.instance);
compositeTypes.add(IntegerType.instance);

final CompositeType compType = CompositeType.getInstance(compositeTypes);
final Builder builder = new CompositeType.Builder(compType);

builder.add(bytes(5));
builder.add(bytes(10));

ssTableWriter.newRow(bytes("20101201"));
ssTableWriter.addColumn(builder.build(), ByteBuffer.allocate(0), System.currentTimeMillis());



(where bytes is the statically imported ByteBufferUtil.bytes method)

But doing this resulted in an ArrayIndexOutOfBounds exception from Cassandra.  Is doing this
any different than using the CompositeSerializer you suggest?

Thanks again,

Daniel Morton


On Thu, May 30, 2013 at 3:32 PM, Keith Wright <kwright@nanigans.com<mailto:kwright@nanigans.com>>
wrote:
You do not want to repeat the first item of your primary key again.  If you recall, in CQL3
a primary key as defined below indicates that the row key is the first item (key) and then
the column names are composites of val1,val2.  Although I don't see why you need val2 as part
of the primary key in this case.  In any event, you would do something like this (although
I've never tested passing a null value):

ssTableWriter.newRow(StringSerializer.get().toByteBuffer("20101201"));
Composite columnComposite = new Composite();
columnComposite(0,5,IntegerSerializer.get());
columnComposite(0,10,IntegerSerializer.get());
ssTableWriter.addColumn(
CompositeSerializer.get().toByteBuffer(columnComposite),
null,
System.currentTimeMillis()
);

From: Daniel Morton <daniel@djmorton.com<mailto:daniel@djmorton.com>>
Reply-To: "user@cassandra.apache.org<mailto:user@cassandra.apache.org>" <user@cassandra.apache.org<mailto:user@cassandra.apache.org>>
Date: Thursday, May 30, 2013 1:06 PM
To: "user@cassandra.apache.org<mailto:user@cassandra.apache.org>" <user@cassandra.apache.org<mailto:user@cassandra.apache.org>>
Subject: Bulk loading into CQL3 Composite Columns

Hi All.  I am trying to bulk load some data into a CQL3 table using the sstableloader utility
and I am having some difficulty figuring out how to use the SSTableSimpleUnsortedWriter with
composite columns.

I have created this simple contrived table for testing:

create table test (key varchar, val1 int, val2 int, primary key (key, val1, val2));

Loosely following the bulk loading example in the docs, I have constructed the following method
to create my temporary SSTables.

public static void main(String[] args) throws Exception {
   final List<AbstractType<?>> compositeTypes = new ArrayList<>();
   compositeTypes.add(UTF8Type.instance);
   compositeTypes.add(IntegerType.instance);
   compositeTypes.add(IntegerType.instance);
   final CompositeType compType =
      CompositeType.getInstance(compositeTypes);
   SSTableSimpleUnsortedWriter ssTableWriter =
      new SSTableSimpleUnsortedWriter(
         new File("/tmp/cassandra_bulk/bigdata/test"),
         new Murmur3Partitioner() ,
         "bigdata",
         "test",
         compType,
         null,
         128);

   final Builder builder =
      new CompositeType.Builder(compType);

   builder.add(bytes("20101201"));
   builder.add(bytes(5));
   builder.add(bytes(10));

   ssTableWriter.newRow(bytes("20101201"));
   ssTableWriter.addColumn(
         builder.build(),
         ByteBuffer.allocate(0),
         System.currentTimeMillis()
   );

   ssTableWriter.close();
}

When I execute this method and load the data using sstableloader, if I do a 'SELECT * FROM
test' in cqlsh, I get the results:

key      | val1       | val2
----------------------------
20101201 | '20101201' | 5

And the error:  Failed to decode value '20101201' (for column 'val1') as int.

The error I get makes sense, as apparently it tried to place the key value into the val1 column.
 From this error, I then assumed that the key value should not be part of the composite type
when the row is added, so I removed the UTF8Type from the composite type, and only added the
two integer values through the builder, but when I repeat the select with that data loaded,
Cassandra throws an ArrayIndexOutOfBoundsException in the ColumnGroupMap class.

Can anyone offer any advice on the correct way to insert data via the bulk loading process
into CQL3 tables with composite columns?  Does the fact that I am not inserting a value for
the columns make a difference?  For my particular use case, all I care about is the values
in the column names themselves (and the associated sorting that goes with them).

Any info or help anyone could provide would be very much appreciated.

Regards,

Daniel Morton



Mime
View raw message