cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [4/5] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0
Date Tue, 11 Feb 2014 23:05:27 GMT
Merge branch 'cassandra-1.2' into cassandra-2.0

Conflicts:
	src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
	test/unit/org/apache/cassandra/db/CleanupTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b0e4f00c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b0e4f00c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b0e4f00c

Branch: refs/heads/trunk
Commit: b0e4f00c8c3986c0702f2b08b0d2cd4dd18b1dbf
Parents: 662f546 7787dea
Author: Aleksey Yeschenko <aleksey@apache.org>
Authored: Wed Feb 12 01:51:09 2014 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Wed Feb 12 01:51:09 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cql3/statements/ModificationStatement.java  | 28 +++++++++++++++++++-
 2 files changed, 28 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0e4f00c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 16cbd0a,56059a1..aec6f5e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -16,33 -3,24 +16,34 @@@ Merged from 1.2
   * Fix partition and range deletes not triggering flush (CASSANDRA-6655)
   * Fix mean cells and mean row size per sstable calculations (CASSANDRA-6667)
   * Compact hints after partial replay to clean out tombstones (CASSANDRA-6666)
+  * Log USING TTL/TIMESTAMP in a counter update warning (CASSANDRA-6649)
  
 -
 -1.2.15
 - * Move handling of migration event source to solve bootstrap race (CASSANDRA-6648)
 - * Make sure compaction throughput value doesn't overflow with int math (CASSANDRA-6647)
 -
 -
 -1.2.14
 - * Reverted code to limit CQL prepared statement cache by size (CASSANDRA-6592)
 - * add cassandra.default_messaging_version property to allow easier
 -   upgrading from 1.1 (CASSANDRA-6619)
 - * Allow executing CREATE statements multiple times (CASSANDRA-6471)
 - * Don't send confusing info with timeouts (CASSANDRA-6491)
 - * Don't resubmit counter mutation runnables internally (CASSANDRA-6427)
 - * Don't drop local mutations without a hint (CASSANDRA-6510)
 - * Don't allow null max_hint_window_in_ms (CASSANDRA-6419)
 - * Validate SliceRange start and finish lengths (CASSANDRA-6521)
 +2.0.5
 + * Reduce garbage generated by bloom filter lookups (CASSANDRA-6609)
 + * Add ks.cf names to tombstone logging (CASSANDRA-6597)
 + * Use LOCAL_QUORUM for LWT operations at LOCAL_SERIAL (CASSANDRA-6495)
 + * Wait for gossip to settle before accepting client connections (CASSANDRA-4288)
 + * Delete unfinished compaction incrementally (CASSANDRA-6086)
 + * Allow specifying custom secondary index options in CQL3 (CASSANDRA-6480)
 + * Improve replica pinning for cache efficiency in DES (CASSANDRA-6485)
 + * Fix LOCAL_SERIAL from thrift (CASSANDRA-6584)
 + * Don't special case received counts in CAS timeout exceptions (CASSANDRA-6595)
 + * Add support for 2.1 global counter shards (CASSANDRA-6505)
 + * Fix NPE when streaming connection is not yet established (CASSANDRA-6210)
 + * Avoid rare duplicate read repair triggering (CASSANDRA-6606)
 + * Fix paging discardFirst (CASSANDRA-6555)
 + * Fix ArrayIndexOutOfBoundsException in 2ndary index query (CASSANDRA-6470)
 + * Release sstables upon rebuilding 2i (CASSANDRA-6635)
 + * Add AbstractCompactionStrategy.startup() method (CASSANDRA-6637)
 + * SSTableScanner may skip rows during cleanup (CASSANDRA-6638)
 + * sstables from stalled repair sessions can resurrect deleted data (CASSANDRA-6503)
 + * Switch stress to use ITransportFactory (CASSANDRA-6641)
 + * Fix IllegalArgumentException during prepare (CASSANDRA-6592)
 + * Fix possible loss of 2ndary index entries during compaction (CASSANDRA-6517)
 + * Fix direct Memory on architectures that do not support unaligned long access
 +   (CASSANDRA-6628)
 + * Let scrub optionally skip broken counter partitions (CASSANDRA-5930)
 +Merged from 1.2:
   * fsync compression metadata (CASSANDRA-6531)
   * Validate CF existence on execution for prepared statement (CASSANDRA-6535)
   * Add ability to throttle batchlog replay (CASSANDRA-6550)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0e4f00c/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 2567043,1b4dc37..676286c
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@@ -20,10 -20,10 +20,12 @@@ package org.apache.cassandra.cql3.state
  import java.nio.ByteBuffer;
  import java.util.*;
  
 +import org.github.jamm.MemoryMeter;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
  
  import org.apache.cassandra.auth.Permission;
 +import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.cql3.*;
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.filter.ColumnSlice;
@@@ -38,78 -36,38 +40,83 @@@ import org.apache.cassandra.service.CAS
  import org.apache.cassandra.service.ClientState;
  import org.apache.cassandra.service.QueryState;
  import org.apache.cassandra.service.StorageProxy;
 +import org.apache.cassandra.thrift.ThriftValidation;
  import org.apache.cassandra.transport.messages.ResultMessage;
 +import org.apache.cassandra.utils.Pair;
  
 -/**
 - * Abstract class for statements that apply on a given column family.
 +/*
 + * Abstract parent class of individual modifications, i.e. INSERT, UPDATE and DELETE.
   */
 -public abstract class ModificationStatement extends CFStatement implements CQLStatement,
MeasurableForPreparedCache
 +public abstract class ModificationStatement implements CQLStatement, MeasurableForPreparedCache
  {
 -    public static enum Type
 -    {
 -        LOGGED, UNLOGGED, COUNTER
 -    }
 +    private static final ColumnIdentifier CAS_RESULT_COLUMN = new ColumnIdentifier("[applied]",
false);
  
+     private static final Logger logger = LoggerFactory.getLogger(ModificationStatement.class);
+ 
+     private static boolean loggedCounterTTL = false;
+     private static boolean loggedCounterTimestamp = false;
+ 
 -    protected Type type;
 +    public final CFMetaData cfm;
 +    public final Attributes attrs;
 +
 +    private final Map<ColumnIdentifier, Restriction> processedKeys = new HashMap<ColumnIdentifier,
Restriction>();
 +    private final List<Operation> columnOperations = new ArrayList<Operation>();
 +
 +    private int boundTerms;
 +    private List<Operation> columnConditions;
 +    private boolean ifNotExists;
 +
 +    public ModificationStatement(CFMetaData cfm, Attributes attrs)
 +    {
 +        this.cfm = cfm;
 +        this.attrs = attrs;
 +    }
 +
 +    public long measureForPreparedCache(MemoryMeter meter)
 +    {
 +        return meter.measure(this)
 +             + meter.measureDeep(attrs)
 +             + meter.measureDeep(processedKeys)
 +             + meter.measureDeep(columnOperations)
 +             + (columnConditions == null ? 0 : meter.measureDeep(columnConditions));
 +    }
 +
 +    public abstract boolean requireFullClusteringKey();
 +    public abstract ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder,
UpdateParameters params) throws InvalidRequestException;
 +
 +    public int getBoundTerms()
 +    {
 +        return boundTerms;
 +    }
 +
 +    public String keyspace()
 +    {
 +        return cfm.ksName;
 +    }
 +
 +    public String columnFamily()
 +    {
 +        return cfm.cfName;
 +    }
 +
 +    public boolean isCounter()
 +    {
 +        return cfm.getDefaultValidator().isCommutative();
 +    }
  
 -    private Long timestamp;
 -    private final int timeToLive;
 +    public long getTimestamp(long now, List<ByteBuffer> variables) throws InvalidRequestException
 +    {
 +        return attrs.getTimestamp(now, variables);
 +    }
  
 -    public ModificationStatement(CFName name, Attributes attrs)
 +    public boolean isTimestampSet()
      {
 -        this(name, attrs.timestamp, attrs.timeToLive);
 +        return attrs.isTimestampSet();
      }
  
 -    public ModificationStatement(CFName name, Long timestamp, int timeToLive)
 +    public int getTimeToLive(List<ByteBuffer> variables) throws InvalidRequestException
      {
 -        super(name);
 -        this.timestamp = timestamp;
 -        this.timeToLive = timeToLive;
 +        return attrs.getTimeToLive(variables);
      }
  
      public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException
@@@ -123,196 -77,95 +130,215 @@@
  
      public void validate(ClientState state) throws InvalidRequestException
      {
 -        if (timeToLive < 0)
 -            throw new InvalidRequestException("A TTL must be greater or equal to 0");
 +        if (hasConditions() && attrs.isTimestampSet())
 +            throw new InvalidRequestException("Custom timestamps are not allowed when conditions
are used");
+ 
 -        if (timeToLive > ExpiringColumn.MAX_TTL)
 -            throw new InvalidRequestException(String.format("ttl is too large. requested
(%d) maximum (%d)", timeToLive, ExpiringColumn.MAX_TTL));
 -
 -        if (type == Type.COUNTER)
++        if (isCounter())
+         {
 -            if (timestamp != null && !loggedCounterTimestamp)
++            if (attrs.isTimestampSet() && !loggedCounterTimestamp)
+             {
+                 logger.warn("Detected use of 'USING TIMESTAMP' in a counter UPDATE. This
is invalid " +
+                             "because counters do not use timestamps, and the timestamp has
been ignored. " +
+                             "Such queries will be rejected in Cassandra 2.1+ - please fix
your queries before then.");
+                 loggedCounterTimestamp = true;
+             }
+ 
 -            if (timeToLive != 0 && !loggedCounterTTL)
++            if (attrs.isTimeToLiveSet() && !loggedCounterTTL)
+             {
+                 logger.warn("Detected use of 'USING TTL' in a counter UPDATE. This is invalid
" +
+                             "because counter tables do not support TTL, and the TTL value
has been ignored. " +
+                             "Such queries will be rejected in Cassandra 2.1+ - please fix
your queries before then.");
+                 loggedCounterTTL = true;
+             }
+         }
      }
  
 -    protected abstract void validateConsistency(ConsistencyLevel cl) throws InvalidRequestException;
 +    public void addOperation(Operation op)
 +    {
 +        columnOperations.add(op);
 +    }
  
 -    public ResultMessage execute(ConsistencyLevel cl, QueryState queryState, List<ByteBuffer>
variables) throws RequestExecutionException, RequestValidationException
 +    public List<Operation> getOperations()
      {
 -        if (cl == null)
 -            throw new InvalidRequestException("Invalid empty consistency level");
 +        return columnOperations;
 +    }
  
 -        validateConsistency(cl);
 +    public void addCondition(Operation op)
 +    {
 +        if (columnConditions == null)
 +            columnConditions = new ArrayList<Operation>();
  
 -        // The type should have been set by now or we have a bug
 -        assert type != null;
 +        columnConditions.add(op);
 +    }
  
 -        Collection<? extends IMutation> mutations = getMutations(variables, false,
cl, queryState.getTimestamp());
 -        if (mutations.isEmpty())
 -            return null;
 +    public void setIfNotExistCondition()
 +    {
 +        ifNotExists = true;
 +    }
  
 -        switch (type)
 -        {
 -            case LOGGED:
 -                if (mutations.size() > 1)
 -                    StorageProxy.mutateAtomically((Collection<RowMutation>) mutations,
cl);
 -                else
 -                    StorageProxy.mutate(mutations, cl);
 -                break;
 -            case UNLOGGED:
 -            case COUNTER:
 -                StorageProxy.mutate(mutations, cl);
 -                break;
 -            default:
 -                throw new AssertionError();
 -        }
 +    private void addKeyValues(ColumnIdentifier name, Restriction values) throws InvalidRequestException
 +    {
 +        if (processedKeys.put(name, values) != null)
 +            throw new InvalidRequestException(String.format("Multiple definitions found
for PRIMARY KEY part %s", name));
 +    }
  
 -        return null;
 +    public void addKeyValue(ColumnIdentifier name, Term value) throws InvalidRequestException
 +    {
 +        addKeyValues(name, new Restriction.EQ(value, false));
      }
  
 -    public ResultMessage executeInternal(QueryState queryState) throws RequestValidationException,
RequestExecutionException
 +    public void processWhereClause(List<Relation> whereClause, VariableSpecifications
names) throws InvalidRequestException
      {
 -        for (IMutation mutation : getMutations(Collections.<ByteBuffer>emptyList(),
true, null, queryState.getTimestamp()))
 -            mutation.apply();
 -        return null;
 +        CFDefinition cfDef = cfm.getCfDef();
 +        for (Relation rel : whereClause)
 +        {
 +            CFDefinition.Name name = cfDef.get(rel.getEntity());
 +            if (name == null)
 +                throw new InvalidRequestException(String.format("Unknown key identifier
%s", rel.getEntity()));
 +
 +            switch (name.kind)
 +            {
 +                case KEY_ALIAS:
 +                case COLUMN_ALIAS:
 +                    Restriction restriction;
 +
 +                    if (rel.operator() == Relation.Type.EQ)
 +                    {
 +                        Term t = rel.getValue().prepare(name);
 +                        t.collectMarkerSpecification(names);
 +                        restriction = new Restriction.EQ(t, false);
 +                    }
 +                    else if (name.kind == CFDefinition.Name.Kind.KEY_ALIAS && rel.operator()
== Relation.Type.IN)
 +                    {
 +                        if (rel.getValue() != null)
 +                        {
 +                            Term t = rel.getValue().prepare(name);
 +                            t.collectMarkerSpecification(names);
 +                            restriction = Restriction.IN.create(t);
 +                        }
 +                        else
 +                        {
 +                            List<Term> values = new ArrayList<Term>(rel.getInValues().size());
 +                            for (Term.Raw raw : rel.getInValues())
 +                            {
 +                                Term t = raw.prepare(name);
 +                                t.collectMarkerSpecification(names);
 +                                values.add(t);
 +                            }
 +                            restriction = Restriction.IN.create(values);
 +                        }
 +                    }
 +                    else
 +                    {
 +                        throw new InvalidRequestException(String.format("Invalid operator
%s for PRIMARY KEY part %s", rel.operator(), name));
 +                    }
 +
 +                    addKeyValues(name.name, restriction);
 +                    break;
 +                case VALUE_ALIAS:
 +                case COLUMN_METADATA:
 +                    throw new InvalidRequestException(String.format("Non PRIMARY KEY %s
found in where clause", name));
 +            }
 +        }
      }
  
 -    public long getTimestamp(long now)
 +    public List<ByteBuffer> buildPartitionKeyNames(List<ByteBuffer> variables)
 +    throws InvalidRequestException
      {
 -        return timestamp == null ? now : timestamp;
 +        CFDefinition cfDef = cfm.getCfDef();
 +        ColumnNameBuilder keyBuilder = cfDef.getKeyNameBuilder();
 +        List<ByteBuffer> keys = new ArrayList<ByteBuffer>();
 +        for (CFDefinition.Name name : cfDef.keys.values())
 +        {
 +            Restriction r = processedKeys.get(name.name);
 +            if (r == null)
 +                throw new InvalidRequestException(String.format("Missing mandatory PRIMARY
KEY part %s", name));
 +
 +            List<ByteBuffer> values = r.values(variables);
 +
 +            if (keyBuilder.remainingCount() == 1)
 +            {
 +                for (ByteBuffer val : values)
 +                {
 +                    if (val == null)
 +                        throw new InvalidRequestException(String.format("Invalid null value
for partition key part %s", name));
 +                    keys.add(keyBuilder.copy().add(val).build());
 +                }
 +            }
 +            else
 +            {
 +                if (values.size() != 1)
 +                    throw new InvalidRequestException("IN is only supported on the last
column of the partition key");
 +                ByteBuffer val = values.get(0);
 +                if (val == null)
 +                    throw new InvalidRequestException(String.format("Invalid null value
for partition key part %s", name));
 +                keyBuilder.add(val);
 +            }
 +        }
 +        return keys;
      }
  
 -    public void setTimestamp(long timestamp)
 +    public ColumnNameBuilder createClusteringPrefixBuilder(List<ByteBuffer> variables)
 +    throws InvalidRequestException
      {
 -        this.timestamp = timestamp;
 +        CFDefinition cfDef = cfm.getCfDef();
 +        ColumnNameBuilder builder = cfDef.getColumnNameBuilder();
 +        CFDefinition.Name firstEmptyKey = null;
 +        for (CFDefinition.Name name : cfDef.columns.values())
 +        {
 +            Restriction r = processedKeys.get(name.name);
 +            if (r == null)
 +            {
 +                firstEmptyKey = name;
 +                if (requireFullClusteringKey() && cfDef.isComposite && !cfDef.isCompact)
 +                    throw new InvalidRequestException(String.format("Missing mandatory PRIMARY
KEY part %s", name));
 +            }
 +            else if (firstEmptyKey != null)
 +            {
 +                throw new InvalidRequestException(String.format("Missing PRIMARY KEY part
%s since %s is set", firstEmptyKey.name, name.name));
 +            }
 +            else
 +            {
 +                List<ByteBuffer> values = r.values(variables);
 +                assert values.size() == 1; // We only allow IN for row keys so far
 +                ByteBuffer val = values.get(0);
 +                if (val == null)
 +                    throw new InvalidRequestException(String.format("Invalid null value
for clustering key part %s", name));
 +                builder.add(val);
 +            }
 +        }
 +        return builder;
      }
  
 -    public boolean isSetTimestamp()
 +    protected CFDefinition.Name getFirstEmptyKey()
      {
 -        return timestamp != null;
 +        for (CFDefinition.Name name : cfm.getCfDef().columns.values())
 +        {
 +            if (processedKeys.get(name.name) == null)
 +                return name;
 +        }
 +        return null;
      }
  
 -    public int getTimeToLive()
 +    protected Map<ByteBuffer, ColumnGroupMap> readRequiredRows(List<ByteBuffer>
partitionKeys, ColumnNameBuilder clusteringPrefix, boolean local, ConsistencyLevel cl)
 +    throws RequestExecutionException, RequestValidationException
      {
 -        return timeToLive;
 +        // Lists SET operation incurs a read.
 +        Set<ByteBuffer> toRead = null;
 +        for (Operation op : columnOperations)
 +        {
 +            if (op.requiresRead())
 +            {
 +                if (toRead == null)
 +                    toRead = new TreeSet<ByteBuffer>(UTF8Type.instance);
 +                toRead.add(op.columnName.key);
 +            }
 +        }
 +
 +        return toRead == null ? null : readRows(partitionKeys, clusteringPrefix, toRead,
(CompositeType)cfm.comparator, local, cl);
      }
  
 -    protected Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> keys,
ColumnNameBuilder builder, Set<ByteBuffer> toRead, CompositeType composite, boolean
local, ConsistencyLevel cl)
 +    private Map<ByteBuffer, ColumnGroupMap> readRows(List<ByteBuffer> partitionKeys,
ColumnNameBuilder clusteringPrefix, Set<ByteBuffer> toRead, CompositeType composite,
boolean local, ConsistencyLevel cl)
      throws RequestExecutionException, RequestValidationException
      {
          try
@@@ -508,219 -226,8 +534,219 @@@
       * @return list of the mutations
       * @throws InvalidRequestException on invalid requests
       */
 -    protected abstract Collection<? extends IMutation> getMutations(List<ByteBuffer>
variables, boolean local, ConsistencyLevel cl, long now)
 -    throws RequestExecutionException, RequestValidationException;
 +    public Collection<? extends IMutation> getMutations(List<ByteBuffer> variables,
boolean local, ConsistencyLevel cl, long now, boolean isBatch)
 +    throws RequestExecutionException, RequestValidationException
 +    {
 +        List<ByteBuffer> keys = buildPartitionKeyNames(variables);
 +        ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables);
 +
 +        // Some lists operation requires reading
 +        Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, clusteringPrefix,
local, cl);
 +        UpdateParameters params = new UpdateParameters(cfm, variables, getTimestamp(now,
variables), getTimeToLive(variables), rows);
 +
 +        Collection<IMutation> mutations = new ArrayList<IMutation>();
 +        for (ByteBuffer key: keys)
 +        {
 +            ThriftValidation.validateKey(cfm, key);
 +            ColumnFamily cf = updateForKey(key, clusteringPrefix, params);
 +            mutations.add(makeMutation(key, cf, cl, isBatch));
 +        }
 +        return mutations;
 +    }
 +
 +    private IMutation makeMutation(ByteBuffer key, ColumnFamily cf, ConsistencyLevel cl,
boolean isBatch)
 +    {
 +        RowMutation rm;
 +        if (isBatch)
 +        {
 +            // we might group other mutations together with this one later, so make it mutable
 +            rm = new RowMutation(cfm.ksName, key);
 +            rm.add(cf);
 +        }
 +        else
 +        {
 +            rm = new RowMutation(cfm.ksName, key, cf);
 +        }
 +        return isCounter() ? new CounterMutation(rm, cl) : rm;
 +    }
 +
 +    private static abstract class CQL3CasConditions implements CASConditions
 +    {
 +        protected final ColumnNameBuilder rowPrefix;
 +        protected final long now;
 +
 +        protected CQL3CasConditions(ColumnNameBuilder rowPrefix, long now)
 +        {
 +            this.rowPrefix = rowPrefix;
 +            this.now = now;
 +        }
  
 -    public abstract ParsedStatement.Prepared prepare(ColumnSpecification[] boundNames) throws
InvalidRequestException;
 +        public IDiskAtomFilter readFilter()
 +        {
 +            // We always read the row entirely as on CAS failure we want to be able to distinguish
between "row exists
 +            // but all values on why there were conditions are null" and "row doesn't exists",
and we can't rely on the
 +            // row marker for that (see #6623)
 +            return new SliceQueryFilter(rowPrefix.build(), rowPrefix.buildAsEndOfRange(),
false, 1, rowPrefix.componentCount());
 +        }
 +    }
 +
 +    private static class NotExistCondition extends CQL3CasConditions
 +    {
 +        private NotExistCondition(ColumnNameBuilder rowPrefix, long now)
 +        {
 +            super(rowPrefix, now);
 +        }
 +
 +        public boolean appliesTo(ColumnFamily current)
 +        {
 +            return current == null || current.hasOnlyTombstones(now);
 +        }
 +    }
 +
 +    private static class ColumnsConditions extends CQL3CasConditions
 +    {
 +        private final ColumnFamily expected;
 +
 +        private ColumnsConditions(ColumnNameBuilder rowPrefix,
 +                                  CFMetaData cfm,
 +                                  ByteBuffer key,
 +                                  Collection<Operation> conditions,
 +                                  List<ByteBuffer> variables,
 +                                  long now) throws InvalidRequestException
 +        {
 +            super(rowPrefix, now);
 +            this.expected = TreeMapBackedSortedColumns.factory.create(cfm);
 +
 +            // When building the conditions, we should not use a TTL. It's not useful, and
if a very low ttl (1 seconds) is used, it's possible
 +            // for it to expire before the actual build of the conditions which would break
since we would then testing for the presence of tombstones.
 +            UpdateParameters params = new UpdateParameters(cfm, variables, now, 0, null);
 +
 +            // Conditions
 +            for (Operation condition : conditions)
 +                condition.execute(key, expected, rowPrefix.copy(), params);
 +        }
 +
 +        public boolean appliesTo(ColumnFamily current)
 +        {
 +            if (current == null)
 +                return false;
 +
 +            for (Column e : expected)
 +            {
 +                Column c = current.getColumn(e.name());
 +                if (e.isLive(now))
 +                {
 +                    if (c == null || !c.isLive(now) || !c.value().equals(e.value()))
 +                        return false;
 +                }
 +                else
 +                {
 +                    // If we have a tombstone in expected, it means the condition tests
that the column is
 +                    // null, so check that we have no value
 +                    if (c != null && c.isLive(now))
 +                        return false;
 +                }
 +            }
 +            return true;
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return expected.toString();
 +        }
 +    }
 +
 +    public static abstract class Parsed extends CFStatement
 +    {
 +        protected final Attributes.Raw attrs;
 +        private final List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions;
 +        private final boolean ifNotExists;
 +
 +        protected Parsed(CFName name, Attributes.Raw attrs, List<Pair<ColumnIdentifier,
Operation.RawUpdate>> conditions, boolean ifNotExists)
 +        {
 +            super(name);
 +            this.attrs = attrs;
 +            this.conditions = conditions == null ? Collections.<Pair<ColumnIdentifier,
Operation.RawUpdate>>emptyList() : conditions;
 +            this.ifNotExists = ifNotExists;
 +        }
 +
 +        public ParsedStatement.Prepared prepare() throws InvalidRequestException
 +        {
 +            VariableSpecifications boundNames = getBoundVariables();
 +            ModificationStatement statement = prepare(boundNames);
 +            return new ParsedStatement.Prepared(statement, boundNames);
 +        }
 +
 +        public ModificationStatement prepare(VariableSpecifications boundNames) throws InvalidRequestException
 +        {
 +            CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
 +            CFDefinition cfDef = metadata.getCfDef();
 +
 +            // The collected count in the beginning of preparation.
 +            // Will start at non-zero for statements nested inside a BatchStatement (the
second and the further ones).
 +            int collected = boundNames.getCollectedCount();
 +
 +            Attributes preparedAttributes = attrs.prepare(keyspace(), columnFamily());
 +            preparedAttributes.collectMarkerSpecification(boundNames);
 +
 +            ModificationStatement stmt = prepareInternal(cfDef, boundNames, preparedAttributes);
 +
-             if (ifNotExists || (conditions != null && !conditions.isEmpty()))
++            if (ifNotExists || !conditions.isEmpty())
 +            {
 +                if (stmt.isCounter())
 +                    throw new InvalidRequestException("Conditional updates are not supported
on counter tables");
 +
 +                if (attrs.timestamp != null)
 +                    throw new InvalidRequestException("Cannot provide custom timestamp for
conditional update");
 +
 +                if (ifNotExists)
 +                {
 +                    // To have both 'IF NOT EXISTS' and some other conditions doesn't make
sense.
 +                    // So far this is enforced by the parser, but let's assert it for sanity
if ever the parse changes.
 +                    assert conditions.isEmpty();
 +                    stmt.setIfNotExistCondition();
 +                }
 +                else
 +                {
 +                    for (Pair<ColumnIdentifier, Operation.RawUpdate> entry : conditions)
 +                    {
 +                        CFDefinition.Name name = cfDef.get(entry.left);
 +                        if (name == null)
 +                            throw new InvalidRequestException(String.format("Unknown identifier
%s", entry.left));
 +
 +                        /*
 +                         * Lists column names are based on a server-side generated timeuuid.
So we can't allow lists
 +                         * operation or that would yield unexpected results (update that
should apply wouldn't). So for
 +                         * now, we just refuse lists, which also save use from having to
bother about the read that some
 +                         * list operation involve.
 +                         */
 +                        if (name.type instanceof ListType)
 +                            throw new InvalidRequestException(String.format("List operation
(%s) are not allowed in conditional updates", name));
 +
 +                        Operation condition = entry.right.prepare(name);
 +                        assert !condition.requiresRead();
 +
 +                        condition.collectMarkerSpecification(boundNames);
 +
 +                        switch (name.kind)
 +                        {
 +                            case KEY_ALIAS:
 +                            case COLUMN_ALIAS:
 +                                throw new InvalidRequestException(String.format("PRIMARY
KEY part %s found in SET part", entry.left));
 +                            case VALUE_ALIAS:
 +                            case COLUMN_METADATA:
 +                                stmt.addCondition(condition);
 +                                break;
 +                        }
 +                    }
 +                }
 +            }
 +
 +            stmt.boundTerms = boundNames.getCollectedCount() - collected;
 +            return stmt;
 +        }
 +
 +        protected abstract ModificationStatement prepareInternal(CFDefinition cfDef, VariableSpecifications
boundNames, Attributes attrs) throws InvalidRequestException;
 +    }
  }


Mime
View raw message