apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Max Bridgewater <max.bridgewa...@gmail.com>
Subject Re: Cassandra Output Opretaor: Invalid null value for partition key part id
Date Mon, 05 Dec 2016 19:21:38 GMT
Thanks, that's exactly what happened. One of my input event had Id as null.
What happened from there is what confused me. But it now makes complete
sense. Because this event was not delivered successfully, Apex kept
retrying. This means that messages that came after the malformed one were
stuck. I wonder if there is a way to limit the number of retries or if this
is left to the application layer.

On Mon, Dec 5, 2016 at 11:53 AM, Priyanka Gugale <priyag@apache.org> wrote:

> Is it possible that your input has "null" value for id field? id seems to
> be your primary key, so it cannot accept null values.
>
> -Priyanka
>
> On Mon, Dec 5, 2016 at 10:04 PM, Max Bridgewater <
> max.bridgewater@gmail.com> wrote:
>
>> Folks,
>>
>> I am trying to write sample stuff to Cassandra. The operator keeps dying
>> and being restated. The failure trace is below. This failure happens even
>> if no data is going through the pipeline.
>>
>> Here is how I create the Cassandra operator:
>>
>>         List<FieldInfo> fieldInfos = Lists.newArrayList();
>>         fieldInfos.add(new FieldInfo("id", "id", null));
>>         fieldInfos.add(new FieldInfo("city", "city", null));
>>         fieldInfos.add(new FieldInfo("fname", "firstName", null));
>>         fieldInfos.add(new FieldInfo("lname", "lastName", null));
>>
>>         KafkaSinglePortInputOperator in = dag.addOperator("kafkaIn",
>> new     KafkaSinglePortInputOperator());
>>         in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset
>> .EARLIEST.name());
>>         JsonParser parser = dag.addOperator("jsonParser", new
>> JsonParser());
>>         CassandraTransactionalStore transactionalStore = new
>> CassandraTransactionalStore();
>>         CassandraPOJOOutputOperator out = new
>> CassandraPOJOOutputOperator();
>>         out.setStore(transactionalStore);
>>         out.setFieldInfos(fieldInfos);
>>         dag.addOperator("CassandraDataWriter", out);
>>         dag.addStream("parse", in.outputPort, parser.in);
>>         dag.addStream("data", parser.out, out.input);
>>
>> The json parser seems to work well and deserializes Kafka events into
>> POJOs that I then want to write to Cassandra.
>>
>> The Cassandra schema is as follows:
>>
>> CREATE TABLE testapp.testuser (
>>     id uuid PRIMARY KEY,
>>     city text,
>>     fname text,
>>     lname text
>> ) WITH bloom_filter_fp_chance = 0.01
>>     AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
>>     AND comment = ''
>>     AND compaction = {'class': 'org.apache.cassandra.db.compa
>> ction.SizeTieredCompactionStrategy'}
>>     AND compression = {'sstable_compression': 'org.apache.cassandra.io
>> .compress.LZ4Compressor'}
>>     AND dclocal_read_repair_chance = 0.1
>>     AND default_time_to_live = 0
>>     AND gc_grace_seconds = 864000
>>     AND max_index_interval = 2048
>>     AND memtable_flush_period_in_ms = 0
>>     AND min_index_interval = 128
>>     AND read_repair_chance = 0.0
>>     AND speculative_retry = '99.0PERCENTILE';
>>
>> Again, even without sending data, the exception happens. What am I
>> missing? Any hint would be appreciated.
>>
>> 2016-12-05 16:24:31,643 INFO com.datatorrent.common.util.AsyncFSStorageAgent:
>> using /app/hadoop/tmp/nm-local-dir/usercache/dtadmin/appcache/appl
>> ication_1480950234717_0002/container_1480950234717_0002_
>> 01_000137/tmp/chkp6701939091095420196 as the basepath for checkpointing.
>> 2016-12-05 16:24:31,704 ERROR com.datatorrent.stram.engine.StreamingContainer:
>> Operator set [OperatorDeployInfo[id=3,name=CassandraDataWriter,type=GENER
>> IC,checkpoint={58458e1b00000a4f, 0, 0},inputs=[OperatorDeployInfo.
>> InputDeployInfo[portName=input,streamId=data,sourceNodeId=2,
>> sourcePortName=out,locality=<null>,partitionMask=0,
>> partitionKeys=<null>]],outputs=[]]] stopped running due to an exception.
>> com.datastax.driver.core.exceptions.InvalidQueryException: Invalid null
>> value for partition key part id
>> at com.datastax.driver.core.exceptions.InvalidQueryException.
>> copy(InvalidQueryException.java:50)
>> at com.datastax.driver.core.DriverThrowables.propagateCause(Dri
>> verThrowables.java:37)
>> at com.datastax.driver.core.DefaultResultSetFuture.getUninterru
>> ptibly(DefaultResultSetFuture.java:245)
>> at com.datastax.driver.core.AbstractSession.execute(AbstractSes
>> sion.java:64)
>>
>>
>>
>

Mime
View raw message