cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [2/4] git commit: Merge branch 'cassandra-1.2' into cassandra-2.0
Date Fri, 20 Dec 2013 17:17:30 GMT
Merge branch 'cassandra-1.2' into cassandra-2.0

Conflicts:
	src/java/org/apache/cassandra/cql3/QueryProcessor.java
	src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
	src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
	src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
	src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
	src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
	src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1435ffd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1435ffd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1435ffd

Branch: refs/heads/cassandra-2.0
Commit: b1435ffd1dcdeab9f0ebc52cf1ed3cddafa13c4e
Parents: 4d36bbf b97c523
Author: Aleksey Yeschenko <aleksey@apache.org>
Authored: Fri Dec 20 20:14:07 2013 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Fri Dec 20 20:14:07 2013 +0300

----------------------------------------------------------------------
 .../org/apache/cassandra/cql3/CQLStatement.java |  2 +-
 .../apache/cassandra/cql3/QueryProcessor.java   | 20 ++++++++++----------
 .../statements/AuthenticationStatement.java     |  2 +-
 .../cql3/statements/AuthorizationStatement.java |  2 +-
 .../cql3/statements/BatchStatement.java         |  4 ++--
 .../cql3/statements/ModificationStatement.java  |  4 ++--
 .../cql3/statements/ParsedStatement.java        |  2 +-
 .../statements/SchemaAlteringStatement.java     |  2 +-
 .../cql3/statements/SelectStatement.java        |  4 ++--
 .../cql3/statements/TruncateStatement.java      |  2 +-
 .../cassandra/cql3/statements/UseStatement.java |  2 +-
 .../cassandra/thrift/CassandraServer.java       |  2 +-
 .../transport/messages/BatchMessage.java        |  5 ++---
 13 files changed, 26 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/cql3/CQLStatement.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java
index ad3c4b4,332aea1..02361a8
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@@ -209,22 -152,10 +209,22 @@@ public class QueryProcesso
      public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState
queryState)
      throws RequestExecutionException, RequestValidationException
      {
 +        return process(queryString, queryState, new QueryOptions(cl, Collections.<ByteBuffer>emptyList()));
 +    }
 +
 +    public static ResultMessage process(String queryString, QueryState queryState, QueryOptions
options)
 +    throws RequestExecutionException, RequestValidationException
 +    {
          CQLStatement prepared = getStatement(queryString, queryState.getClientState()).statement;
-         if (prepared.getBoundsTerms() != options.getValues().size())
 -        if (prepared.getBoundTerms() > 0)
 -            throw new InvalidRequestException("Cannot execute query with bind variables");
 -        return processStatement(prepared, cl, queryState, Collections.<ByteBuffer>emptyList());
++        if (prepared.getBoundTerms() != options.getValues().size())
 +            throw new InvalidRequestException("Invalid amount of bind variables");
 +
 +        return processStatement(prepared, queryState, options, queryString);
 +    }
 +
 +    public static CQLStatement parseStatement(String queryStr, QueryState queryState) throws
RequestValidationException
 +    {
 +        return getStatement(queryStr, queryState.getClientState()).statement;
      }
  
      public static UntypedResultSet process(String query, ConsistencyLevel cl) throws RequestExecutionException
@@@ -286,20 -218,12 +286,20 @@@
      throws RequestValidationException
      {
          ParsedStatement.Prepared prepared = getStatement(queryString, clientState);
-         int bountTerms = prepared.statement.getBoundsTerms();
-         if (bountTerms > FBUtilities.MAX_UNSIGNED_SHORT)
-             throw new InvalidRequestException(String.format("Too many markers(?). %d markers
exceed the allowed maximum of %d", bountTerms, FBUtilities.MAX_UNSIGNED_SHORT));
-         assert bountTerms == prepared.boundNames.size();
++        int boundTerms = prepared.statement.getBoundTerms();
++        if (boundTerms > FBUtilities.MAX_UNSIGNED_SHORT)
++            throw new InvalidRequestException(String.format("Too many markers(?). %d markers
exceed the allowed maximum of %d", boundTerms, FBUtilities.MAX_UNSIGNED_SHORT));
++        assert boundTerms == prepared.boundNames.size();
 +
          ResultMessage.Prepared msg = storePreparedStatement(queryString, clientState.getRawKeyspace(),
prepared, forThrift);
  
 -        int bountTerms = prepared.statement.getBoundTerms();
 -        if (bountTerms > FBUtilities.MAX_UNSIGNED_SHORT)
 -            throw new InvalidRequestException(String.format("Too many markers(?). %d markers
exceed the allowed maximum of %d", bountTerms, FBUtilities.MAX_UNSIGNED_SHORT));
 -        assert bountTerms == prepared.boundNames.size();
 +        if (!postPreparationHooks.isEmpty())
 +        {
 +            PreparationContext context = new PreparationContext(clientState, queryString,
prepared.boundNames);
 +            for (PostPreparationHook hook : postPreparationHooks)
 +                hook.processStatement(prepared.statement, context);
 +        }
 +
          return msg;
      }
  
@@@ -330,22 -254,21 +330,22 @@@
              MD5Digest statementId = MD5Digest.compute(toHash);
              preparedStatements.put(statementId, prepared.statement);
              logger.trace(String.format("Stored prepared statement %s with %d bind markers",
 -                         statementId,
 -                         prepared.statement.getBoundTerms()));
 -            return new ResultMessage.Prepared(statementId, prepared.boundNames);
 +                                       statementId,
-                                        prepared.statement.getBoundsTerms()));
++                                       prepared.statement.getBoundTerms()));
 +            return new ResultMessage.Prepared(statementId, prepared);
          }
      }
  
 -    public static ResultMessage processPrepared(CQLStatement statement, ConsistencyLevel
cl, QueryState queryState, List<ByteBuffer> variables)
 +    public static ResultMessage processPrepared(CQLStatement statement, QueryState queryState,
QueryOptions options)
      throws RequestExecutionException, RequestValidationException
      {
 +        List<ByteBuffer> variables = options.getValues();
          // Check to see if there are any bound variables to verify
-         if (!(variables.isEmpty() && (statement.getBoundsTerms() == 0)))
+         if (!(variables.isEmpty() && (statement.getBoundTerms() == 0)))
          {
-             if (variables.size() != statement.getBoundsTerms())
+             if (variables.size() != statement.getBoundTerms())
                  throw new InvalidRequestException(String.format("there were %d markers(?)
in CQL but %d bound variables",
-                                                                 statement.getBoundsTerms(),
+                                                                 statement.getBoundTerms(),
                                                                  variables.size()));
  
              // at this point there is a match in count between markers and variables that
is non-zero

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 311a3c7,05dae48..25f61fb
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@@ -72,11 -64,14 +72,11 @@@ public class BatchStatement implements 
          return size;
      }
  
-     public int getBoundsTerms()
 -    @Override
 -    public void prepareKeyspace(ClientState state) throws InvalidRequestException
++    public int getBoundTerms()
      {
 -        for (ModificationStatement statement : statements)
 -            statement.prepareKeyspace(state);
 +        return boundTerms;
      }
  
 -    @Override
      public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException
      {
          for (ModificationStatement statement : statements)
@@@ -182,52 -147,4 +182,52 @@@
      {
          return String.format("BatchStatement(type=%s, statements=%s)", type, statements);
      }
 +
 +    public static class Parsed extends CFStatement
 +    {
 +        private final Type type;
 +        private final Attributes.Raw attrs;
 +        private final List<ModificationStatement.Parsed> parsedStatements;
 +
 +        public Parsed(Type type, Attributes.Raw attrs, List<ModificationStatement.Parsed>
parsedStatements)
 +        {
 +            super(null);
 +            this.type = type;
 +            this.attrs = attrs;
 +            this.parsedStatements = parsedStatements;
 +        }
 +
 +        @Override
 +        public void prepareKeyspace(ClientState state) throws InvalidRequestException
 +        {
 +            for (ModificationStatement.Parsed statement : parsedStatements)
 +                statement.prepareKeyspace(state);
 +        }
 +
 +        public ParsedStatement.Prepared prepare() throws InvalidRequestException
 +        {
-             VariableSpecifications boundNames = getBoundsVariables();
++            VariableSpecifications boundNames = getBoundVariables();
 +
 +            List<ModificationStatement> statements = new ArrayList<ModificationStatement>(parsedStatements.size());
 +            for (ModificationStatement.Parsed parsed : parsedStatements)
 +            {
 +                ModificationStatement stmt = parsed.prepare(boundNames);
 +                if (stmt.hasConditions())
 +                    throw new InvalidRequestException("Conditional updates are not allowed
in batches");
 +
 +                if (stmt.isCounter() && type != Type.COUNTER)
 +                    throw new InvalidRequestException("Counter mutations are only allowed
in COUNTER batches");
 +
 +                if (!stmt.isCounter() && type == Type.COUNTER)
 +                    throw new InvalidRequestException("Only counter mutations are allowed
in COUNTER batches");
 +
 +                statements.add(stmt);
 +            }
 +
 +            Attributes prepAttrs = attrs.prepare("[batch]", "[batch]");
 +            prepAttrs.collectMarkerSpecification(boundNames);
 +
 +            return new ParsedStatement.Prepared(new BatchStatement(boundNames.size(), type,
statements, prepAttrs), boundNames);
 +        }
 +    }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 8833f34,bfbf511..9e0fd62
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@@ -36,75 -31,35 +36,75 @@@ import org.apache.cassandra.exceptions.
  import org.apache.cassandra.service.ClientState;
  import org.apache.cassandra.service.QueryState;
  import org.apache.cassandra.service.StorageProxy;
 +import org.apache.cassandra.thrift.ThriftValidation;
  import org.apache.cassandra.transport.messages.ResultMessage;
 +import org.apache.cassandra.utils.Pair;
 +import org.apache.cassandra.utils.ByteBufferUtil;
  
 -/**
 - * Abstract class for statements that apply on a given column family.
 +/*
 + * Abstract parent class of individual modifications, i.e. INSERT, UPDATE and DELETE.
   */
 -public abstract class ModificationStatement extends CFStatement implements CQLStatement,
MeasurableForPreparedCache
 +public abstract class ModificationStatement implements CQLStatement, MeasurableForPreparedCache
  {
 -    public static enum Type
 +    private static final ColumnIdentifier CAS_RESULT_COLUMN = new ColumnIdentifier("[applied]",
false);
 +
 +    public final CFMetaData cfm;
 +    public final Attributes attrs;
 +
 +    private final Map<ColumnIdentifier, Restriction> processedKeys = new HashMap<ColumnIdentifier,
Restriction>();
 +    private final List<Operation> columnOperations = new ArrayList<Operation>();
 +
 +    private int boundTerms;
 +    private List<Operation> columnConditions;
 +    private boolean ifNotExists;
 +
 +    public ModificationStatement(CFMetaData cfm, Attributes attrs)
      {
 -        LOGGED, UNLOGGED, COUNTER
 +        this.cfm = cfm;
 +        this.attrs = attrs;
      }
  
 -    protected Type type;
 +    public long measureForPreparedCache(MemoryMeter meter)
 +    {
 +        return meter.measureDeep(this) - meter.measureDeep(cfm);
 +    }
 +
 +    public abstract boolean requireFullClusteringKey();
 +    public abstract ColumnFamily updateForKey(ByteBuffer key, ColumnNameBuilder builder,
UpdateParameters params) throws InvalidRequestException;
  
-     public int getBoundsTerms()
 -    private Long timestamp;
 -    private final int timeToLive;
++    public int getBoundTerms()
 +    {
 +        return boundTerms;
 +    }
  
 -    public ModificationStatement(CFName name, Attributes attrs)
 +    public String keyspace()
      {
 -        this(name, attrs.timestamp, attrs.timeToLive);
 +        return cfm.ksName;
      }
  
 -    public ModificationStatement(CFName name, Long timestamp, int timeToLive)
 +    public String columnFamily()
      {
 -        super(name);
 -        this.timestamp = timestamp;
 -        this.timeToLive = timeToLive;
 +        return cfm.cfName;
 +    }
 +
 +    public boolean isCounter()
 +    {
 +        return cfm.getDefaultValidator().isCommutative();
 +    }
 +
 +    public long getTimestamp(long now, List<ByteBuffer> variables) throws InvalidRequestException
 +    {
 +        return attrs.getTimestamp(now, variables);
 +    }
 +
 +    public boolean isTimestampSet()
 +    {
 +        return attrs.isTimestampSet();
 +    }
 +
 +    public int getTimeToLive(List<ByteBuffer> variables) throws InvalidRequestException
 +    {
 +        return attrs.getTimeToLive(variables);
      }
  
      public void checkAccess(ClientState state) throws InvalidRequestException, UnauthorizedException
@@@ -502,156 -199,8 +502,156 @@@
       * @return list of the mutations
       * @throws InvalidRequestException on invalid requests
       */
 -    protected abstract Collection<? extends IMutation> getMutations(List<ByteBuffer>
variables, boolean local, ConsistencyLevel cl, long now)
 -    throws RequestExecutionException, RequestValidationException;
 +    public Collection<? extends IMutation> getMutations(List<ByteBuffer> variables,
boolean local, ConsistencyLevel cl, long now, boolean isBatch)
 +    throws RequestExecutionException, RequestValidationException
 +    {
 +        List<ByteBuffer> keys = buildPartitionKeyNames(variables);
 +        ColumnNameBuilder clusteringPrefix = createClusteringPrefixBuilder(variables);
 +
 +        // Some lists operation requires reading
 +        Map<ByteBuffer, ColumnGroupMap> rows = readRequiredRows(keys, clusteringPrefix,
local, cl);
 +        UpdateParameters params = new UpdateParameters(cfm, variables, getTimestamp(now,
variables), getTimeToLive(variables), rows);
  
 -    public abstract ParsedStatement.Prepared prepare(ColumnSpecification[] boundNames) throws
InvalidRequestException;
 +        Collection<IMutation> mutations = new ArrayList<IMutation>();
 +        for (ByteBuffer key: keys)
 +        {
 +            ThriftValidation.validateKey(cfm, key);
 +            ColumnFamily cf = updateForKey(key, clusteringPrefix, params);
 +            mutations.add(makeMutation(key, cf, cl, isBatch));
 +        }
 +        return mutations;
 +    }
 +
 +    private IMutation makeMutation(ByteBuffer key, ColumnFamily cf, ConsistencyLevel cl,
boolean isBatch)
 +    {
 +        RowMutation rm;
 +        if (isBatch)
 +        {
 +            // we might group other mutations together with this one later, so make it mutable
 +            rm = new RowMutation(cfm.ksName, key);
 +            rm.add(cf);
 +        }
 +        else
 +        {
 +            rm = new RowMutation(cfm.ksName, key, cf);
 +        }
 +        return isCounter() ? new CounterMutation(rm, cl) : rm;
 +    }
 +
 +    private ColumnFamily buildConditions(ByteBuffer key, ColumnNameBuilder clusteringPrefix,
UpdateParameters params)
 +    throws InvalidRequestException
 +    {
 +        if (ifNotExists)
 +            return null;
 +
 +        ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfm);
 +
 +        // CQL row marker
 +        CFDefinition cfDef = cfm.getCfDef();
 +        if (cfDef.isComposite && !cfDef.isCompact && !cfm.isSuper())
 +        {
 +            ByteBuffer name = clusteringPrefix.copy().add(ByteBufferUtil.EMPTY_BYTE_BUFFER).build();
 +            cf.addColumn(params.makeColumn(name, ByteBufferUtil.EMPTY_BYTE_BUFFER));
 +        }
 +
 +        // Conditions
 +        for (Operation condition : columnConditions)
 +            condition.execute(key, cf, clusteringPrefix.copy(), params);
 +
 +        assert !cf.isEmpty();
 +        return cf;
 +    }
 +
 +    public static abstract class Parsed extends CFStatement
 +    {
 +        protected final Attributes.Raw attrs;
 +        private final List<Pair<ColumnIdentifier, Operation.RawUpdate>> conditions;
 +        private final boolean ifNotExists;
 +
 +        protected Parsed(CFName name, Attributes.Raw attrs, List<Pair<ColumnIdentifier,
Operation.RawUpdate>> conditions, boolean ifNotExists)
 +        {
 +            super(name);
 +            this.attrs = attrs;
 +            this.conditions = conditions == null ? Collections.<Pair<ColumnIdentifier,
Operation.RawUpdate>>emptyList() : conditions;
 +            this.ifNotExists = ifNotExists;
 +        }
 +
 +        public ParsedStatement.Prepared prepare() throws InvalidRequestException
 +        {
-             VariableSpecifications boundNames = getBoundsVariables();
++            VariableSpecifications boundNames = getBoundVariables();
 +            ModificationStatement statement = prepare(boundNames);
 +            return new ParsedStatement.Prepared(statement, boundNames);
 +        }
 +
 +        public ModificationStatement prepare(VariableSpecifications boundNames) throws InvalidRequestException
 +        {
 +            CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
 +            CFDefinition cfDef = metadata.getCfDef();
 +
 +            // The collected count in the beginning of preparation.
 +            // Will start at non-zero for statements nested inside a BatchStatement (the
second and the further ones).
 +            int collected = boundNames.getCollectedCount();
 +
 +            Attributes preparedAttributes = attrs.prepare(keyspace(), columnFamily());
 +            preparedAttributes.collectMarkerSpecification(boundNames);
 +
 +            ModificationStatement stmt = prepareInternal(cfDef, boundNames, preparedAttributes);
 +
 +            if (ifNotExists || (conditions != null && !conditions.isEmpty()))
 +            {
 +                if (stmt.isCounter())
 +                    throw new InvalidRequestException("Conditional updates are not supported
on counter tables");
 +
 +                if (attrs.timestamp != null)
 +                    throw new InvalidRequestException("Cannot provide custom timestamp for
conditional update");
 +
 +                if (ifNotExists)
 +                {
 +                    // To have both 'IF NOT EXISTS' and some other conditions doesn't make
sense.
 +                    // So far this is enforced by the parser, but let's assert it for sanity
if ever the parse changes.
 +                    assert conditions.isEmpty();
 +                    stmt.setIfNotExistCondition();
 +                }
 +                else
 +                {
 +                    for (Pair<ColumnIdentifier, Operation.RawUpdate> entry : conditions)
 +                    {
 +                        CFDefinition.Name name = cfDef.get(entry.left);
 +                        if (name == null)
 +                            throw new InvalidRequestException(String.format("Unknown identifier
%s", entry.left));
 +
 +                        /*
 +                         * Lists column names are based on a server-side generated timeuuid.
So we can't allow lists
 +                         * operation or that would yield unexpected results (update that
should apply wouldn't). So for
 +                         * now, we just refuse lists, which also save use from having to
bother about the read that some
 +                         * list operation involve.
 +                         */
 +                        if (name.type instanceof ListType)
 +                            throw new InvalidRequestException(String.format("List operation
(%s) are not allowed in conditional updates", name));
 +
 +                        Operation condition = entry.right.prepare(name);
 +                        assert !condition.requiresRead();
 +
 +                        condition.collectMarkerSpecification(boundNames);
 +
 +                        switch (name.kind)
 +                        {
 +                            case KEY_ALIAS:
 +                            case COLUMN_ALIAS:
 +                                throw new InvalidRequestException(String.format("PRIMARY
KEY part %s found in SET part", entry.left));
 +                            case VALUE_ALIAS:
 +                            case COLUMN_METADATA:
 +                                stmt.addCondition(condition);
 +                                break;
 +                        }
 +                    }
 +                }
 +            }
 +
 +            stmt.boundTerms = boundNames.getCollectedCount() - collected;
 +            return stmt;
 +        }
 +
 +        protected abstract ModificationStatement prepareInternal(CFDefinition cfDef, VariableSpecifications
boundNames, Attributes attrs) throws InvalidRequestException;
 +    }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
index ecf8a8a,2d0b4c7..d048327
--- a/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ParsedStatement.java
@@@ -25,11 -25,11 +25,11 @@@ import org.apache.cassandra.exceptions.
  
  public abstract class ParsedStatement
  {
 -    private int boundTerms;
 +    private VariableSpecifications variables;
  
-     public VariableSpecifications getBoundsVariables()
 -    public int getBoundTerms()
++    public VariableSpecifications getBoundVariables()
      {
 -        return boundTerms;
 +        return variables;
      }
  
      // Used by the parser and preparable statement

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
index f2904e4,4d40e99..337e8dc
--- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
@@@ -44,11 -47,6 +44,11 @@@ public abstract class SchemaAlteringSta
          this.isColumnFamilyLevel = true;
      }
  
-     public int getBoundsTerms()
++    public int getBoundTerms()
 +    {
 +        return 0;
 +    }
 +
      @Override
      public void prepareKeyspace(ClientState state) throws InvalidRequestException
      {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index b94e549,4730f18..133444a
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@@ -1067,9 -997,12 +1067,9 @@@ public class SelectStatement implement
          {
              CFMetaData cfm = ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
  
 -            if (parameters.limit <= 0)
 -                throw new InvalidRequestException("LIMIT must be strictly positive");
 -
              CFDefinition cfDef = cfm.getCfDef();
  
-             VariableSpecifications names = getBoundsVariables();
 -            ColumnSpecification[] names = new ColumnSpecification[getBoundTerms()];
++            VariableSpecifications names = getBoundVariables();
  
              // Select clause
              if (parameters.isCount && !selectClause.isEmpty())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index d5baedf,16445f5..30e57d5
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@@ -36,11 -39,6 +36,11 @@@ public class TruncateStatement extends 
          super(name);
      }
  
-     public int getBoundsTerms()
++    public int getBoundTerms()
 +    {
 +        return 0;
 +    }
 +
      public Prepared prepare() throws InvalidRequestException
      {
          return new Prepared(this);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/UseStatement.java
index db2435f,0db80bf..ee70f9d
--- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
@@@ -34,11 -37,6 +34,11 @@@ public class UseStatement extends Parse
          this.keyspace = keyspace;
      }
  
-     public int getBoundsTerms()
++    public int getBoundTerms()
 +    {
 +        return 0;
 +    }
 +
      public Prepared prepare() throws InvalidRequestException
      {
          return new Prepared(this);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/thrift/CassandraServer.java
index 4e3c372,ec7a37d..07c271b
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@@ -2077,11 -1849,9 +2077,11 @@@ public class CassandraServer implement
                                                                  " (either the query was
not prepared on this host (maybe the host has been restarted?)" +
                                                                  " or you have prepared too
many queries and it has been evicted from the internal cache)",
                                                                  itemId));
-             logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId,
statement.getBoundsTerms());
+             logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId,
statement.getBoundTerms());
  
 -            return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, ThriftConversion.fromThrift(cLevel),
cState.getQueryState(), bindVariables).toThriftResult();
 +            return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement,
 +                                                                            cState.getQueryState(),
 +                                                                            new QueryOptions(ThriftConversion.fromThrift(cLevel),
bindVariables)).toThriftResult();
          }
          catch (RequestExecutionException e)
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1435ffd/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index bd95ef3,0000000..487e089
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@@ -1,237 -1,0 +1,236 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.transport.messages;
 +
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.UUID;
 +
 +import org.jboss.netty.buffer.ChannelBuffer;
- import org.jboss.netty.buffer.ChannelBuffers;
 +
 +import org.apache.cassandra.cql3.Attributes;
 +import org.apache.cassandra.cql3.CQLStatement;
 +import org.apache.cassandra.cql3.QueryProcessor;
 +import org.apache.cassandra.cql3.statements.BatchStatement;
 +import org.apache.cassandra.cql3.statements.ModificationStatement;
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.exceptions.PreparedQueryNotFoundException;
 +import org.apache.cassandra.service.QueryState;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.transport.*;
 +import org.apache.cassandra.utils.MD5Digest;
 +import org.apache.cassandra.utils.UUIDGen;
 +
 +public class BatchMessage extends Message.Request
 +{
 +    public static final Message.Codec<BatchMessage> codec = new Message.Codec<BatchMessage>()
 +    {
 +        public BatchMessage decode(ChannelBuffer body, int version)
 +        {
 +            if (version == 1)
 +                throw new ProtocolException("BATCH messages are not support in version 1
of the protocol");
 +
 +            byte type = body.readByte();
 +            int n = body.readUnsignedShort();
 +            List<Object> queryOrIds = new ArrayList<Object>(n);
 +            List<List<ByteBuffer>> variables = new ArrayList<List<ByteBuffer>>(n);
 +            for (int i = 0; i < n; i++)
 +            {
 +                byte kind = body.readByte();
 +                if (kind == 0)
 +                    queryOrIds.add(CBUtil.readLongString(body));
 +                else if (kind == 1)
 +                    queryOrIds.add(MD5Digest.wrap(CBUtil.readBytes(body)));
 +                else
 +                    throw new ProtocolException("Invalid query kind in BATCH messages. Must
be 0 or 1 but got " + kind);
 +                variables.add(CBUtil.readValueList(body));
 +            }
 +            ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body);
 +            return new BatchMessage(toType(type), queryOrIds, variables, consistency);
 +        }
 +
 +        public void encode(BatchMessage msg, ChannelBuffer dest, int version)
 +        {
 +            int queries = msg.queryOrIdList.size();
 +
 +            dest.writeByte(fromType(msg.type));
 +            dest.writeShort(queries);
 +
 +            for (int i = 0; i < queries; i++)
 +            {
 +                Object q = msg.queryOrIdList.get(i);
 +                dest.writeByte((byte)(q instanceof String ? 0 : 1));
 +                if (q instanceof String)
 +                    CBUtil.writeLongString((String)q, dest);
 +                else
 +                    CBUtil.writeBytes(((MD5Digest)q).bytes, dest);
 +
 +                CBUtil.writeValueList(msg.values.get(i), dest);
 +            }
 +
 +            CBUtil.writeConsistencyLevel(msg.consistency, dest);
 +        }
 +
 +        public int encodedSize(BatchMessage msg, int version)
 +        {
 +            int size = 3; // type + nb queries
 +            for (int i = 0; i < msg.queryOrIdList.size(); i++)
 +            {
 +                Object q = msg.queryOrIdList.get(i);
 +                size += 1 + (q instanceof String
 +                             ? CBUtil.sizeOfLongString((String)q)
 +                             : CBUtil.sizeOfBytes(((MD5Digest)q).bytes));
 +
 +                size += CBUtil.sizeOfValueList(msg.values.get(i));
 +            }
 +            size += CBUtil.sizeOfConsistencyLevel(msg.consistency);
 +            return size;
 +        }
 +
 +        private BatchStatement.Type toType(byte b)
 +        {
 +            if (b == 0)
 +                return BatchStatement.Type.LOGGED;
 +            else if (b == 1)
 +                return BatchStatement.Type.UNLOGGED;
 +            else if (b == 2)
 +                return BatchStatement.Type.COUNTER;
 +            else
 +                throw new ProtocolException("Invalid BATCH message type " + b);
 +        }
 +
 +        private byte fromType(BatchStatement.Type type)
 +        {
 +            switch (type)
 +            {
 +                case LOGGED:   return 0;
 +                case UNLOGGED: return 1;
 +                case COUNTER:  return 2;
 +                default:
 +                    throw new AssertionError();
 +            }
 +        }
 +    };
 +
 +    public final BatchStatement.Type type;
 +    public final List<Object> queryOrIdList;
 +    public final List<List<ByteBuffer>> values;
 +    public final ConsistencyLevel consistency;
 +
 +    public BatchMessage(BatchStatement.Type type, List<Object> queryOrIdList, List<List<ByteBuffer>>
values, ConsistencyLevel consistency)
 +    {
 +        super(Message.Type.BATCH);
 +        this.type = type;
 +        this.queryOrIdList = queryOrIdList;
 +        this.values = values;
 +        this.consistency = consistency;
 +    }
 +
 +    public Message.Response execute(QueryState state)
 +    {
 +        try
 +        {
 +            UUID tracingId = null;
 +            if (isTracingRequested())
 +            {
 +                tracingId = UUIDGen.getTimeUUID();
 +                state.prepareTracingSession(tracingId);
 +            }
 +
 +            if (state.traceNextQuery())
 +            {
 +                state.createTracingSession();
 +                // TODO we don't have [typed] access to CQL bind variables here.  CASSANDRA-4560
is open to add support.
 +                Tracing.instance.begin("Execute batch of CQL3 queries", Collections.<String,
String>emptyMap());
 +            }
 +
 +            List<ModificationStatement> statements = new ArrayList<ModificationStatement>(queryOrIdList.size());
 +            for (int i = 0; i < queryOrIdList.size(); i++)
 +            {
 +                Object query = queryOrIdList.get(i);
 +                CQLStatement statement;
 +                if (query instanceof String)
 +                {
 +                    statement = QueryProcessor.parseStatement((String)query, state);
 +                }
 +                else
 +                {
 +                    statement = QueryProcessor.getPrepared((MD5Digest)query);
 +                    if (statement == null)
 +                        throw new PreparedQueryNotFoundException((MD5Digest)query);
 +                }
 +
 +                List<ByteBuffer> queryValues = values.get(i);
-                 if (queryValues.size() != statement.getBoundsTerms())
++                if (queryValues.size() != statement.getBoundTerms())
 +                    throw new InvalidRequestException(String.format("There were %d markers(?)
in CQL but %d bound variables",
-                                                                     statement.getBoundsTerms(),
++                                                                    statement.getBoundTerms(),
 +                                                                    queryValues.size()));
 +                if (!(statement instanceof ModificationStatement))
 +                    throw new InvalidRequestException("Invalid statement in batch: only
UPDATE, INSERT and DELETE statements are allowed.");
 +
 +                ModificationStatement mst = (ModificationStatement)statement;
 +                if (mst.isCounter())
 +                {
 +                    if (type != BatchStatement.Type.COUNTER)
 +                        throw new InvalidRequestException("Cannot include counter statement
in a non-counter batch");
 +                }
 +                else
 +                {
 +                    if (type == BatchStatement.Type.COUNTER)
 +                        throw new InvalidRequestException("Cannot include non-counter statement
in a counter batch");
 +                }
 +                statements.add(mst);
 +            }
 +
 +            // Note: It's ok at this point to pass a bogus value for the number of bound
terms in the BatchState ctor
 +            // (and no value would be really correct, so we prefer passing a clearly wrong
one).
 +            BatchStatement batch = new BatchStatement(-1, type, statements, Attributes.none());
 +            Message.Response response = QueryProcessor.processBatch(batch, consistency,
state, values, queryOrIdList);
 +
 +            if (tracingId != null)
 +                response.setTracingId(tracingId);
 +
 +            return response;
 +        }
 +        catch (Exception e)
 +        {
 +            return ErrorMessage.fromException(e);
 +        }
 +        finally
 +        {
 +            Tracing.instance.stopSession();
 +        }
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        StringBuilder sb = new StringBuilder();
 +        sb.append("BATCH of [");
 +        for (int i = 0; i < queryOrIdList.size(); i++)
 +        {
 +            if (i > 0) sb.append(", ");
 +            sb.append(queryOrIdList.get(i)).append(" with ").append(values.get(i).size()).append("
values");
 +        }
 +        sb.append("] at consistency ").append(consistency);
 +        return sb.toString();
 +    }
 +}


Mime
View raw message