Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 874E06FB8 for ; Sun, 5 Jun 2011 13:43:48 +0000 (UTC) Received: (qmail 54413 invoked by uid 500); 5 Jun 2011 13:43:48 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 54381 invoked by uid 500); 5 Jun 2011 13:43:48 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 54370 invoked by uid 99); 5 Jun 2011 13:43:48 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 05 Jun 2011 13:43:48 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 05 Jun 2011 13:43:44 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id D54622388A2C; Sun, 5 Jun 2011 13:43:24 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1132406 [2/2] - in /cassandra/trunk: ./ contrib/ contrib/pig/src/java/org/apache/cassandra/hadoop/pig/ debian/ doc/cql/ drivers/java/src/org/apache/cassandra/cql/jdbc/ drivers/java/test/org/apache/cassandra/cql/ drivers/py/cql/ interface/t... Date: Sun, 05 Jun 2011 13:43:23 -0000 To: commits@cassandra.apache.org From: jbellis@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110605134324.D54622388A2C@eris.apache.org> Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/QueryProcessor.java Sun Jun 5 13:43:22 2011 @@ -32,6 +32,8 @@ import java.util.concurrent.TimeoutExcep import com.google.common.base.Predicates; import com.google.common.collect.Maps; +import org.apache.cassandra.db.CounterColumn; +import org.apache.cassandra.db.context.CounterContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,10 +47,8 @@ import org.apache.cassandra.db.filter.Qu import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.MarshalException; import org.apache.cassandra.db.migration.*; -import org.apache.cassandra.db.migration.avro.CfDef; import org.apache.cassandra.dht.*; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; @@ -67,7 +67,7 @@ public class QueryProcessor throws InvalidRequestException, TimedOutException, UnavailableException { QueryPath queryPath = new QueryPath(select.getColumnFamily()); - CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily(), false); + CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily()); List commands = new ArrayList(); // ...of a list of column names @@ -161,7 +161,7 @@ public class QueryProcessor } AbstractBounds bounds = new Bounds(startToken, finishToken); - CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily(), false); + CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily()); // XXX: Our use of Thrift structs internally makes me Sad. :( SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select, metadata); validateSlicePredicate(metadata, thriftSlicePredicate); @@ -214,7 +214,7 @@ public class QueryProcessor private static List getIndexedSlices(String keyspace, SelectStatement select) throws TimedOutException, UnavailableException, InvalidRequestException { - CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily(), false); + CFMetaData metadata = validateColumnFamily(keyspace, select.getColumnFamily()); // XXX: Our use of Thrift structs internally (still) makes me Sad. :~( SlicePredicate thriftSlicePredicate = slicePredicateFromSelect(select, metadata); validateSlicePredicate(metadata, thriftSlicePredicate); @@ -261,7 +261,7 @@ public class QueryProcessor throws InvalidRequestException, UnavailableException, TimedOutException { String keyspace = clientState.getKeyspace(); - List rowMutations = new ArrayList(); + List rowMutations = new ArrayList(); List cfamsSeen = new ArrayList(); for (UpdateStatement update : updateStatements) @@ -491,7 +491,7 @@ public class QueryProcessor case SELECT: SelectStatement select = (SelectStatement)statement.statement; clientState.hasColumnFamilyAccess(select.getColumnFamily(), Permission.READ); - metadata = validateColumnFamily(keyspace, select.getColumnFamily(), false); + metadata = validateColumnFamily(keyspace, select.getColumnFamily()); validateSelect(keyspace, select); List rows; @@ -640,7 +640,7 @@ public class QueryProcessor { KsDef ksd = new KsDef(create.getName(), create.getStrategyClass(), - Collections.emptyList()) + Collections.emptyList()) .setStrategy_options(create.getStrategyOptions()); ThriftValidation.validateKsDef(ksd); applyMigrationOnStage(new AddKeyspace(KSMetaData.fromThrift(ksd))); @@ -694,37 +694,58 @@ public class QueryProcessor createIdx.getColumnFamily())); if (oldCfm == null) throw new InvalidRequestException("No such column family: " + createIdx.getColumnFamily()); - + + boolean columnExists = false; ByteBuffer columnName = createIdx.getColumnName().getByteBuffer(); - ColumnDefinition columnDef = oldCfm.getColumn_metadata().get(columnName); - - // Meta-data for this column already exists - if (columnDef != null) + // mutating oldCfm directly would be bad, but mutating a Thrift copy is fine. This also + // sets us up to use validateCfDef to check for index name collisions. + CfDef cf_def = CFMetaData.convertToThrift(oldCfm); + for (ColumnDef cd : cf_def.column_metadata) { - // This column is already indexed, stop, drop, and roll. - if (columnDef.getIndexType() != null) - throw new InvalidRequestException("Index exists"); - // Add index attrs to the existing definition - columnDef.setIndexName(createIdx.getIndexName()); - columnDef.setIndexType(org.apache.cassandra.thrift.IndexType.KEYS); + if (cd.name.equals(columnName)) + { + if (cd.index_type != null) + throw new InvalidRequestException("Index already exists"); + logger.debug("Updating column {} definition for index {}", oldCfm.comparator.getString(columnName), createIdx.getIndexName()); + cd.setIndex_type(IndexType.KEYS); + cd.setIndex_name(createIdx.getIndexName()); + columnExists = true; + break; + } } - // No meta-data, create a new column definition from scratch. - else + if (!columnExists) + throw new InvalidRequestException("No column definition found for column " + oldCfm.comparator.getString(columnName)); + + CFMetaData.addDefaultIndexNames(cf_def); + ThriftValidation.validateCfDef(cf_def, oldCfm); + try { - columnDef = new ColumnDefinition(columnName, - DatabaseDescriptor.getValueValidator(keyspace, - createIdx.getColumnFamily(), - columnName), - org.apache.cassandra.thrift.IndexType.KEYS, - createIdx.getIndexName()); + applyMigrationOnStage(new UpdateColumnFamily(CFMetaData.convertToAvro(cf_def))); + } + catch (ConfigurationException e) + { + InvalidRequestException ex = new InvalidRequestException(e.toString()); + ex.initCause(e); + throw ex; + } + catch (IOException e) + { + InvalidRequestException ex = new InvalidRequestException(e.toString()); + ex.initCause(e); + throw ex; } - CfDef cfamilyDef = CFMetaData.convertToAvro(oldCfm); - cfamilyDef.column_metadata.add(columnDef.deflate()); - + result.type = CqlResultType.VOID; + return result; + + case DROP_INDEX: + DropIndexStatement dropIdx = (DropIndexStatement)statement.statement; + clientState.hasColumnFamilyListAccess(Permission.WRITE); + validateSchemaAgreement(); + try { - applyMigrationOnStage(new UpdateColumnFamily(cfamilyDef)); + applyMigrationOnStage(dropIdx.generateMutation(clientState.getKeyspace())); } catch (ConfigurationException e) { @@ -738,10 +759,10 @@ public class QueryProcessor ex.initCause(e); throw ex; } - + result.type = CqlResultType.VOID; return result; - + case DROP_KEYSPACE: String deleteKeyspace = (String)statement.statement; clientState.hasKeyspaceListAccess(Permission.WRITE); @@ -791,7 +812,35 @@ public class QueryProcessor result.type = CqlResultType.VOID; return result; - + + case ALTER_TABLE: + AlterTableStatement alterTable = (AlterTableStatement) statement.statement; + + System.out.println(alterTable); + + validateColumnFamily(keyspace, alterTable.columnFamily); + clientState.hasColumnFamilyAccess(alterTable.columnFamily, Permission.WRITE); + validateSchemaAgreement(); + + try + { + applyMigrationOnStage(new UpdateColumnFamily(alterTable.getCfDef(keyspace))); + } + catch (ConfigurationException e) + { + InvalidRequestException ex = new InvalidRequestException(e.getMessage()); + ex.initCause(e); + throw ex; + } + catch (IOException e) + { + InvalidRequestException ex = new InvalidRequestException(e.getMessage()); + ex.initCause(e); + throw ex; + } + + result.type = CqlResultType.VOID; + return result; } return null; // We should never get here. @@ -813,7 +862,7 @@ public class QueryProcessor { if (c.isMarkedForDelete()) continue; - thriftColumns.add(new Column(c.name()).setValue(c.value()).setTimestamp(c.timestamp())); + thriftColumns.add(thriftify(c)); } } else @@ -840,16 +889,25 @@ public class QueryProcessor { throw new AssertionError(e); } + IColumn c = row.cf.getColumn(name); if (c == null || c.isMarkedForDelete()) thriftColumns.add(new Column().setName(name)); else - thriftColumns.add(new Column(c.name()).setValue(c.value()).setTimestamp(c.timestamp())); + thriftColumns.add(thriftify(c)); } } return thriftColumns; } + private static Column thriftify(IColumn c) + { + ByteBuffer value = (c instanceof CounterColumn) + ? ByteBufferUtil.bytes(CounterContext.instance().total(c.value())) + : c.value(); + return new Column(c.name()).setValue(value).setTimestamp(c.timestamp()); + } + private static String getKeyString(CFMetaData metadata) { String keyString; Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/StatementType.java Sun Jun 5 13:43:22 2011 @@ -24,8 +24,8 @@ import java.util.EnumSet; public enum StatementType { - SELECT, INSERT, UPDATE, BATCH, USE, TRUNCATE, DELETE, CREATE_KEYSPACE, CREATE_COLUMNFAMILY, CREATE_INDEX, - DROP_KEYSPACE, DROP_COLUMNFAMILY; + SELECT, INSERT, UPDATE, BATCH, USE, TRUNCATE, DELETE, CREATE_KEYSPACE, CREATE_COLUMNFAMILY, CREATE_INDEX, DROP_INDEX, + DROP_KEYSPACE, DROP_COLUMNFAMILY, ALTER_TABLE; // Statement types that don't require a keyspace to be set. private static final EnumSet topLevel = EnumSet.of(USE, CREATE_KEYSPACE, DROP_KEYSPACE); Modified: cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/cql/UpdateStatement.java Sun Jun 5 13:43:22 2011 @@ -26,6 +26,8 @@ import java.util.*; import org.apache.cassandra.auth.Permission; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.CounterMutation; +import org.apache.cassandra.db.IMutation; import org.apache.cassandra.db.RowMutation; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.db.marshal.AbstractType; @@ -35,6 +37,7 @@ import org.apache.cassandra.thrift.Inval import static org.apache.cassandra.cql.QueryProcessor.validateColumn; +import static org.apache.cassandra.cql.Operation.OperationType; import static org.apache.cassandra.thrift.ThriftValidation.validateColumnFamily; /** @@ -43,7 +46,7 @@ import static org.apache.cassandra.thrif */ public class UpdateStatement extends AbstractModification { - private Map columns; + private Map columns; private List columnNames, columnValues; private List keys; @@ -57,7 +60,7 @@ public class UpdateStatement extends Abs * @param attrs additional attributes for statement (CL, timestamp, timeToLive) */ public UpdateStatement(String columnFamily, - Map columns, + Map columns, List keys, Attributes attrs) { @@ -113,17 +116,28 @@ public class UpdateStatement extends Abs } /** {@inheritDoc} */ - public List prepareRowMutations(String keyspace, ClientState clientState) throws InvalidRequestException + public List prepareRowMutations(String keyspace, ClientState clientState) throws InvalidRequestException { return prepareRowMutations(keyspace, clientState, null); } /** {@inheritDoc} */ - public List prepareRowMutations(String keyspace, ClientState clientState, Long timestamp) throws InvalidRequestException + public List prepareRowMutations(String keyspace, ClientState clientState, Long timestamp) throws InvalidRequestException { List cfamsSeen = new ArrayList(); - CFMetaData metadata = validateColumnFamily(keyspace, columnFamily, false); + boolean hasCommutativeOperation = false; + + for (Map.Entry column : getColumns().entrySet()) + { + if (!column.getValue().isUnary()) + hasCommutativeOperation = true; + + if (hasCommutativeOperation && column.getValue().isUnary()) + throw new InvalidRequestException("Mix of commutative and non-commutative operations is not allowed."); + } + + CFMetaData metadata = validateColumnFamily(keyspace, columnFamily, hasCommutativeOperation); // Avoid unnecessary authorizations. if (!(cfamsSeen.contains(columnFamily))) @@ -132,7 +146,7 @@ public class UpdateStatement extends Abs cfamsSeen.add(columnFamily); } - List rowMutations = new LinkedList(); + List rowMutations = new LinkedList(); for (Term key: keys) { @@ -154,43 +168,61 @@ public class UpdateStatement extends Abs * * @throws InvalidRequestException on the wrong request */ - private RowMutation mutationForKey(String keyspace, ByteBuffer key, CFMetaData metadata, Long timestamp) throws InvalidRequestException - { - RowMutation rm = new RowMutation(keyspace, key); - - mutationForKey(rm, keyspace, metadata, timestamp); - - return rm; - } - - /** {@inheritDoc} */ - public RowMutation mutationForKey(ByteBuffer key, String keyspace, Long timestamp) throws InvalidRequestException - { - return mutationForKey(keyspace, key, validateColumnFamily(keyspace, columnFamily, false), timestamp); - } - - /** {@inheritDoc} */ - public void mutationForKey(RowMutation mutation, String keyspace, Long timestamp) throws InvalidRequestException - { - mutationForKey(mutation, keyspace, validateColumnFamily(keyspace, columnFamily, false), timestamp); - } - - private void mutationForKey(RowMutation mutation, String keyspace, CFMetaData metadata, Long timestamp) throws InvalidRequestException + private IMutation mutationForKey(String keyspace, ByteBuffer key, CFMetaData metadata, Long timestamp) throws InvalidRequestException { AbstractType comparator = getComparator(keyspace); - for (Map.Entry column : getColumns().entrySet()) + // if true we need to wrap RowMutation into CounterMutation + boolean hasCounterColumn = false; + RowMutation rm = new RowMutation(keyspace, key); + + for (Map.Entry column : getColumns().entrySet()) { ByteBuffer colName = column.getKey().getByteBuffer(comparator); - ByteBuffer colValue = column.getValue().getByteBuffer(getValueValidator(keyspace, colName)); + Operation op = column.getValue(); - validateColumn(metadata, colName, colValue); + if (op.isUnary()) + { + if (hasCounterColumn) + throw new InvalidRequestException("Mix of commutative and non-commutative operations is not allowed."); + + ByteBuffer colValue = op.a.getByteBuffer(getValueValidator(keyspace, colName)); + + validateColumn(metadata, colName, colValue); + rm.add(new QueryPath(columnFamily, null, colName), + colValue, + (timestamp == null) ? getTimestamp() : timestamp, + getTimeToLive()); + } + else + { + hasCounterColumn = true; + + if (!column.getKey().getText().equals(op.a.getText())) + throw new InvalidRequestException("Only expressions like X = X + are supported."); + + long value; + + try + { + value = Long.parseLong(op.b.getText()); + + if (op.type == OperationType.MINUS) + { + value *= -1; + } + } + catch (NumberFormatException e) + { + throw new InvalidRequestException(String.format("'%s' is an invalid value, should be a long.", + op.b.getText())); + } - mutation.add(new QueryPath(columnFamily, null, colName), - colValue, - (timestamp == null) ? getTimestamp() : timestamp, - getTimeToLive()); + rm.addCounter(new QueryPath(columnFamily, null, colName), value); + } } + + return (hasCounterColumn) ? new CounterMutation(rm, getConsistencyLevel()) : rm; } public String getColumnFamily() @@ -203,8 +235,8 @@ public class UpdateStatement extends Abs { return keys; } - - public Map getColumns() throws InvalidRequestException + + public Map getColumns() throws InvalidRequestException { // Created from an UPDATE if (columns != null) @@ -218,11 +250,11 @@ public class UpdateStatement extends Abs if (columnNames.size() < 1) throw new InvalidRequestException("no columns specified for INSERT"); - columns = new HashMap(); + columns = new HashMap(); for (int i = 0; i < columnNames.size(); i++) - columns.put(columnNames.get(i), columnValues.get(i)); - + columns.put(columnNames.get(i), new Operation(columnValues.get(i))); + return columns; } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Sun Jun 5 13:43:22 2011 @@ -511,7 +511,7 @@ public class ColumnFamilyStore implement if (cfm != null) // secondary indexes aren't stored in DD. { for (ColumnDefinition def : cfm.getColumn_metadata().values()) - scrubDataDirectories(table, cfm.indexName(def)); + scrubDataDirectories(table, cfm.indexColumnFamilyName(def)); } } @@ -1802,8 +1802,12 @@ public class ColumnFamilyStore implement */ public Future truncate() throws IOException { - // snapshot will also flush, but we want to truncate the most possible, and anything in a flush written - // after truncateAt won't be truncated. + // We have two goals here: + // - truncate should delete everything written before truncate was invoked + // - but not delete anything that isn't part of the snapshot we create. + // We accomplish this by first flushing manually, then snapshotting, and + // recording the timestamp IN BETWEEN those actions. Any sstables created + // with this timestamp or greater time, will not be marked for delete. try { forceBlockingFlush(); @@ -1812,33 +1816,20 @@ public class ColumnFamilyStore implement { throw new RuntimeException(e); } - - final long truncatedAt = System.currentTimeMillis(); - snapshot(Table.getTimestampedSnapshotName("before-truncate")); - - Runnable runnable = new WrappedRunnable() + // sleep a little to make sure that our truncatedAt comes after any sstable + // that was part of the flushed we forced; otherwise on a tie, it won't get deleted. + try { - public void runMayThrow() throws InterruptedException, IOException - { - // putting markCompacted on the commitlogUpdater thread ensures it will run - // after any compactions that were in progress when truncate was called, are finished - for (ColumnFamilyStore cfs : concatWithIndexes()) - { - List truncatedSSTables = new ArrayList(); - for (SSTableReader sstable : cfs.getSSTables()) - { - if (!sstable.newSince(truncatedAt)) - truncatedSSTables.add(sstable); - } - cfs.data.markCompacted(truncatedSSTables); - } - - // Invalidate row cache - invalidateRowCache(); - } - }; + Thread.sleep(100); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + long truncatedAt = System.currentTimeMillis(); + snapshot(Table.getTimestampedSnapshotName("before-truncate")); - return postFlushExecutor.submit(runnable); + return CompactionManager.instance.submitTruncate(this, truncatedAt); } // if this errors out, we are in a world of hurt. Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Sun Jun 5 13:43:22 2011 @@ -47,6 +47,7 @@ import org.apache.cassandra.io.*; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.util.BufferedRandomAccessFile; import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.MemoryInputStream; import org.apache.cassandra.service.AntiEntropyService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.OperationType; @@ -1149,6 +1150,30 @@ public class CompactionManager implement return executor.submit(runnable); } + public Future submitTruncate(final ColumnFamilyStore main, final long truncatedAt) + { + Runnable runnable = new WrappedRunnable() + { + public void runMayThrow() throws InterruptedException, IOException + { + for (ColumnFamilyStore cfs : main.concatWithIndexes()) + { + List truncatedSSTables = new ArrayList(); + for (SSTableReader sstable : cfs.getSSTables()) + { + if (!sstable.newSince(truncatedAt)) + truncatedSSTables.add(sstable); + } + cfs.markCompacted(truncatedSSTables); + } + + main.invalidateRowCache(); + } + }; + + return executor.submit(runnable); + } + private static int getDefaultGcBefore(ColumnFamilyStore cfs) { return cfs.isIndex() Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Sun Jun 5 13:43:22 2011 @@ -22,20 +22,17 @@ package org.apache.cassandra.db; import java.io.File; import java.io.IOError; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.*; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import com.google.common.collect.Iterables; -import org.apache.commons.collections.set.UnmodifiableSet; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.cache.AutoSavingCache; import org.apache.cassandra.config.DatabaseDescriptor; - import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.utils.Pair; @@ -451,18 +448,17 @@ public class DataTracker public final Set sstables; public final Set compacting; - public View(Memtable memtable, Set pendingFlush, Set sstables, Set compacting) + View(Memtable memtable, Set pendingFlush, Set sstables, Set compacting) { this.memtable = memtable; - this.memtablesPendingFlush = pendingFlush instanceof UnmodifiableSet ? pendingFlush : Collections.unmodifiableSet(pendingFlush); - this.sstables = sstables instanceof UnmodifiableSet ? sstables : Collections.unmodifiableSet(sstables); - this.compacting = compacting instanceof UnmodifiableSet ? compacting : Collections.unmodifiableSet(compacting); + this.memtablesPendingFlush = pendingFlush; + this.sstables = sstables; + this.compacting = compacting; } public View switchMemtable(Memtable newMemtable) { - Set newPending = new HashSet(memtablesPendingFlush); - newPending.add(memtable); + Set newPending = ImmutableSet.builder().addAll(memtablesPendingFlush).add(memtable).build(); return new View(newMemtable, newPending, sstables, compacting); } @@ -473,32 +469,27 @@ public class DataTracker public View replaceFlushed(Memtable flushedMemtable, SSTableReader newSSTable) { - Set newPendings = new HashSet(memtablesPendingFlush); - Set newSSTables = new HashSet(sstables); - newPendings.remove(flushedMemtable); - newSSTables.add(newSSTable); - return new View(memtable, newPendings, newSSTables, compacting); + Set newPending = ImmutableSet.copyOf(Sets.difference(memtablesPendingFlush, Collections.singleton(flushedMemtable))); + Set newSSTables = ImmutableSet.builder().addAll(sstables).add(newSSTable).build(); + return new View(memtable, newPending, newSSTables, compacting); } public View replace(Collection oldSSTables, Iterable replacements) { - Set sstablesNew = new HashSet(sstables); - Iterables.addAll(sstablesNew, replacements); - sstablesNew.removeAll(oldSSTables); - return new View(memtable, memtablesPendingFlush, sstablesNew, compacting); + Sets.SetView remaining = Sets.difference(sstables, ImmutableSet.copyOf(oldSSTables)); + Set newSSTables = ImmutableSet.builder().addAll(remaining).addAll(replacements).build(); + return new View(memtable, memtablesPendingFlush, newSSTables, compacting); } public View markCompacting(Collection tomark) { - Set compactingNew = new HashSet(compacting); - compactingNew.addAll(tomark); + Set compactingNew = ImmutableSet.builder().addAll(sstables).addAll(tomark).build(); return new View(memtable, memtablesPendingFlush, sstables, compactingNew); } public View unmarkCompacting(Collection tounmark) { - Set compactingNew = new HashSet(compacting); - compactingNew.removeAll(tounmark); + Set compactingNew = ImmutableSet.copyOf(Sets.difference(compacting, ImmutableSet.copyOf(tounmark))); return new View(memtable, memtablesPendingFlush, sstables, compactingNew); } } Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Sun Jun 5 13:43:22 2011 @@ -731,23 +731,6 @@ public class Table return Iterables.transform(DatabaseDescriptor.getTables(), transformer); } - /** - * Performs a synchronous truncate operation, effectively deleting all data - * from the column family cfname - * @param cfname - * @throws IOException - * @throws ExecutionException - * @throws InterruptedException - */ - public void truncate(String cfname) throws InterruptedException, ExecutionException, IOException - { - logger.debug("Truncating..."); - ColumnFamilyStore cfs = getColumnFamilyStore(cfname); - // truncate, blocking - cfs.truncate().get(); - logger.debug("Truncation done."); - } - @Override public String toString() { Modified: cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/TruncateVerbHandler.java Sun Jun 5 13:43:22 2011 @@ -22,7 +22,6 @@ import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOError; import java.io.IOException; -import java.util.concurrent.ExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +46,8 @@ public class TruncateVerbHandler impleme try { - Table.open(t.keyspace).truncate(t.columnFamily); + ColumnFamilyStore cfs = Table.open(t.keyspace).getColumnFamilyStore(t.columnFamily); + cfs.truncate().get(); } catch (Exception e) { Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Sun Jun 5 13:43:22 2011 @@ -38,6 +38,8 @@ public class IncomingTcpConnection exten { private static Logger logger = LoggerFactory.getLogger(IncomingTcpConnection.class); + private static final int CHUNK_SIZE = 1024 * 1024; + private Socket socket; public IncomingTcpConnection(Socket socket) @@ -97,8 +99,13 @@ public class IncomingTcpConnection exten { int size = input.readInt(); byte[] contentBytes = new byte[size]; - input.readFully(contentBytes); - + // readFully allocates a direct buffer the size of the chunk it is asked to read, + // so we cap that at CHUNK_SIZE. See https://issues.apache.org/jira/browse/CASSANDRA-2654 + int remainder = size % CHUNK_SIZE; + for (int offset = 0; offset < size - remainder; offset += CHUNK_SIZE) + input.readFully(contentBytes, offset, CHUNK_SIZE); + input.readFully(contentBytes, size - remainder, remainder); + if (version > MessagingService.version_) logger.info("Received connection from newer protocol version. Ignorning message."); else Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Sun Jun 5 13:43:22 2011 @@ -799,7 +799,8 @@ public class CassandraServer implements { logger.debug("add_column_family"); state().hasColumnFamilyListAccess(Permission.WRITE); - ThriftValidation.validateCfDef(cf_def); + CFMetaData.addDefaultIndexNames(cf_def); + ThriftValidation.validateCfDef(cf_def, null); validateSchemaAgreement(); try @@ -866,10 +867,11 @@ public class CassandraServer implements try { Collection cfDefs = new ArrayList(ks_def.cf_defs.size()); - for (CfDef cfDef : ks_def.cf_defs) + for (CfDef cf_def : ks_def.cf_defs) { - ThriftValidation.validateCfDef(cfDef); - cfDefs.add(CFMetaData.fromThrift(cfDef)); + CFMetaData.addDefaultIndexNames(cf_def); + ThriftValidation.validateCfDef(cf_def, null); + cfDefs.add(CFMetaData.fromThrift(cf_def)); } ThriftValidation.validateKsDef(ks_def); @@ -953,11 +955,10 @@ public class CassandraServer implements { logger.debug("update_column_family"); state().hasColumnFamilyListAccess(Permission.WRITE); - ThriftValidation.validateCfDef(cf_def); if (cf_def.keyspace == null || cf_def.name == null) throw new InvalidRequestException("Keyspace and CF name must be set."); CFMetaData oldCfm = DatabaseDescriptor.getCFMetaData(CFMetaData.getId(cf_def.keyspace, cf_def.name)); - if (oldCfm == null) + if (oldCfm == null) throw new InvalidRequestException("Could not find column family definition to modify."); validateSchemaAgreement(); Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/thrift/ThriftValidation.java Sun Jun 5 13:43:22 2011 @@ -29,6 +29,7 @@ import org.apache.cassandra.db.marshal.A import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.MarshalException; import org.apache.cassandra.db.marshal.TypeParser; +import org.apache.cassandra.db.migration.Migration; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.RandomPartitioner; import org.apache.cassandra.dht.Token; @@ -513,7 +514,7 @@ public class ThriftValidation throw new InvalidRequestException("No indexed columns present in index clause with operator EQ"); } - public static void validateCfDef(CfDef cf_def) throws InvalidRequestException + public static void validateCfDef(CfDef cf_def, CFMetaData old) throws InvalidRequestException { try { @@ -533,6 +534,22 @@ public class ThriftValidation } } + if (cf_def.key_alias != null) + { + if (!cf_def.key_alias.hasRemaining()) + throw new InvalidRequestException("key_alias may not be empty"); + try + { + // it's hard to use a key in a select statement if we can't type it. + // for now let's keep it simple and require ascii. + AsciiType.instance.validate(cf_def.key_alias); + } + catch (MarshalException e) + { + throw new InvalidRequestException("Key aliases must be ascii"); + } + } + ColumnFamilyType cfType = ColumnFamilyType.create(cf_def.column_type); if (cfType == null) throw new InvalidRequestException("invalid column type " + cf_def.column_type); @@ -550,16 +567,17 @@ public class ThriftValidation ? TypeParser.parse(cf_def.comparator_type) : TypeParser.parse(cf_def.subcomparator_type); + // initialize a set of names NOT in the CF under consideration Set indexNames = new HashSet(); - for (ColumnDef c : cf_def.column_metadata) + for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) { - // Ensure that given idx_names and auto_generated idx_names cannot collide - CFMetaData cfm = CFMetaData.fromThrift(cf_def); - String idxName = cfm.indexName(ColumnDefinition.fromColumnDef(c)); - if (indexNames.contains(idxName)) - throw new InvalidRequestException("Duplicate index names " + idxName); - indexNames.add(idxName); + if (!cfs.getColumnFamilyName().equals(cf_def.name)) + for (ColumnDefinition cd : cfs.metadata.getColumn_metadata().values()) + indexNames.add(cd.getIndexName()); + } + for (ColumnDef c : cf_def.column_metadata) + { TypeParser.parse(c.validation_class); try @@ -572,11 +590,31 @@ public class ThriftValidation ByteBufferUtil.bytesToHex(c.name), cf_def.comparator_type)); } - if ((c.index_name != null) && (c.index_type == null)) - throw new ConfigurationException("index_name cannot be set without index_type"); - - if (cfType == ColumnFamilyType.Super && c.index_type != null) - throw new InvalidRequestException("Secondary indexes are not supported on supercolumns"); + if (c.index_type == null) + { + if (c.index_name != null) + throw new ConfigurationException("index_name cannot be set without index_type"); + } + else + { + if (cfType == ColumnFamilyType.Super) + throw new InvalidRequestException("Secondary indexes are not supported on supercolumns"); + assert c.index_name != null; // should have a default set by now if none was provided + if (!Migration.isLegalName(c.index_name)) + throw new InvalidRequestException("Illegal index name " + c.index_name); + // check index names against this CF _and_ globally + if (indexNames.contains(c.index_name)) + throw new InvalidRequestException("Duplicate index name " + c.index_name); + indexNames.add(c.index_name); + + ColumnDefinition oldCd = old == null ? null : old.getColumnDefinition(c.name); + if (oldCd != null && oldCd.getIndexType() != null) + { + assert oldCd.getIndexName() != null; + if (!oldCd.getIndexName().equals(c.index_name)) + throw new InvalidRequestException("Cannot modify index name"); + } + } } validateMinMaxCompactionThresholds(cf_def); validateMemtableSettings(cf_def); Modified: cassandra/trunk/test/system/test_cql.py URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_cql.py?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/test/system/test_cql.py (original) +++ cassandra/trunk/test/system/test_cql.py Sun Jun 5 13:43:22 2011 @@ -75,6 +75,10 @@ def load_sample(dbconn): CREATE COLUMNFAMILY IndexedA (KEY text PRIMARY KEY, birthdate int) WITH comparator = ascii AND default_validation = ascii; """) + dbconn.execute(""" + CREATE COLUMNFAMILY CounterCF (KEY text PRIMARY KEY, count_me counter) + WITH comparator = ascii AND default_validation = counter; + """) dbconn.execute("CREATE INDEX ON IndexedA (birthdate)") query = "UPDATE StandardString1 SET :c1 = :v1, :c2 = :v2 WHERE KEY = :key" @@ -526,7 +530,7 @@ class TestCql(ThriftTester): "creating column indexes" cursor = init() cursor.execute("USE Keyspace1") - cursor.execute("CREATE COLUMNFAMILY CreateIndex1 (KEY text PRIMARY KEY)") + cursor.execute("CREATE COLUMNFAMILY CreateIndex1 (KEY text PRIMARY KEY, items text, stuff int)") cursor.execute("CREATE INDEX namedIndex ON CreateIndex1 (items)") cursor.execute("CREATE INDEX ON CreateIndex1 (stuff)") @@ -535,10 +539,9 @@ class TestCql(ThriftTester): cfam = [i for i in ksdef.cf_defs if i.name == "CreateIndex1"][0] items = [i for i in cfam.column_metadata if i.name == "items"][0] stuff = [i for i in cfam.column_metadata if i.name == "stuff"][0] - assert items.index_name == "namedIndex", "missing index (or name)" + assert items.index_name == "namedIndex", items.index_name assert items.index_type == 0, "missing index" - assert not stuff.index_name, \ - "index_name should be unset, not %s" % stuff.index_name + assert stuff.index_name != None, "index_name should be set" assert stuff.index_type == 0, "missing index" # already indexed @@ -546,6 +549,34 @@ class TestCql(ThriftTester): cursor.execute, "CREATE INDEX ON CreateIndex1 (stuff)") + def test_drop_indexes(self): + "droping indexes on columns" + cursor = init() + cursor.execute("""CREATE KEYSPACE DropIndexTests WITH strategy_options:replication_factor = '1' + AND strategy_class = 'SimpleStrategy';""") + cursor.execute("USE DropIndexTests") + cursor.execute("CREATE COLUMNFAMILY IndexedCF (KEY text PRIMARY KEY, n text)") + cursor.execute("CREATE INDEX namedIndex ON IndexedCF (n)") + + ksdef = thrift_client.describe_keyspace("DropIndexTests") + columns = ksdef.cf_defs[0].column_metadata + + assert columns[0].index_name == "namedIndex" + assert columns[0].index_type == 0 + + # testing "DROP INDEX " + cursor.execute("DROP INDEX namedIndex") + + ksdef = thrift_client.describe_keyspace("DropIndexTests") + columns = ksdef.cf_defs[0].column_metadata + + assert columns[0].index_type == None + assert columns[0].index_name == None + + assert_raises(cql.ProgrammingError, + cursor.execute, + "DROP INDEX undefIndex") + def test_time_uuid(self): "store and retrieve time-based (type 1) uuids" cursor = init() @@ -1006,3 +1037,151 @@ class TestCql(ThriftTester): r = cursor.fetchone() assert len(r) == 1, "expected 0 results, got %d" % len(r) + + def test_alter_table_statement(self): + "test ALTER TABLE statement" + cursor = init() + cursor.execute(""" + CREATE KEYSPACE AlterTableKS WITH strategy_options:replication_factor = '1' + AND strategy_class = 'SimpleStrategy'; + """) + cursor.execute("USE AlterTableKS;") + + cursor.execute(""" + CREATE COLUMNFAMILY NewCf1 (KEY varint PRIMARY KEY) WITH default_validation = ascii; + """) + + # TODO: temporary (until this can be done with CQL). + ksdef = thrift_client.describe_keyspace("AlterTableKS") + assert len(ksdef.cf_defs) == 1, \ + "expected 1 column family total, found %d" % len(ksdef.cf_defs) + cfam = ksdef.cf_defs[0] + + assert len(cfam.column_metadata) == 0 + + # testing "add a new column" + cursor.execute("ALTER TABLE NewCf1 ADD name varchar") + + ksdef = thrift_client.describe_keyspace("AlterTableKS") + assert len(ksdef.cf_defs) == 1, \ + "expected 1 column family total, found %d" % len(ksdef.cf_defs) + columns = ksdef.cf_defs[0].column_metadata + + assert len(columns) == 1 + assert columns[0].name == 'name' + assert columns[0].validation_class == 'org.apache.cassandra.db.marshal.UTF8Type' + + # testing "alter a column type" + cursor.execute("ALTER TABLE NewCf1 ALTER name TYPE ascii") + + ksdef = thrift_client.describe_keyspace("AlterTableKS") + assert len(ksdef.cf_defs) == 1, \ + "expected 1 column family total, found %d" % len(ksdef.cf_defs) + columns = ksdef.cf_defs[0].column_metadata + + assert len(columns) == 1 + assert columns[0].name == 'name' + assert columns[0].validation_class == 'org.apache.cassandra.db.marshal.AsciiType' + + # alter column with unknown validator + assert_raises(cql.ProgrammingError, + cursor.execute, + "ALTER TABLE NewCf1 ADD name utf8") + + # testing 'drop an existing column' + cursor.execute("ALTER TABLE NewCf1 DROP name") + + ksdef = thrift_client.describe_keyspace("AlterTableKS") + assert len(ksdef.cf_defs) == 1, \ + "expected 1 column family total, found %d" % len(ksdef.cf_defs) + columns = ksdef.cf_defs[0].column_metadata + + assert len(columns) == 0 + + # add column with unknown validator + assert_raises(cql.ProgrammingError, + cursor.execute, + "ALTER TABLE NewCf1 ADD name utf8") + + # alter not existing column + assert_raises(cql.ProgrammingError, + cursor.execute, + "ALTER TABLE NewCf1 ALTER name TYPE uuid") + + # drop not existing column + assert_raises(cql.ProgrammingError, + cursor.execute, + "ALTER TABLE NewCf1 DROP name") + + def test_counter_column_support(self): + "update statement should be able to work with counter columns" + cursor = init() + + # increment counter + cursor.execute("UPDATE CounterCF SET count_me = count_me + 2 WHERE key = 'counter1'") + cursor.execute("SELECT * FROM CounterCF WHERE KEY = 'counter1'") + assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount + colnames = [col_d[0] for col_d in cursor.description] + + assert colnames[1] == "count_me", \ + "unrecognized name '%s'" % colnames[1] + + r = cursor.fetchone() + assert r[1] == 2, \ + "unrecognized value '%s'" % r[1] + + cursor.execute("UPDATE CounterCF SET count_me = count_me + 2 WHERE key = 'counter1'") + cursor.execute("SELECT * FROM CounterCF WHERE KEY = 'counter1'") + assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount + colnames = [col_d[0] for col_d in cursor.description] + + assert colnames[1] == "count_me", \ + "unrecognized name '%s'" % colnames[1] + + r = cursor.fetchone() + assert r[1] == 4, \ + "unrecognized value '%s'" % r[1] + + # decrement counter + cursor.execute("UPDATE CounterCF SET count_me = count_me - 4 WHERE key = 'counter1'") + cursor.execute("SELECT * FROM CounterCF WHERE KEY = 'counter1'") + assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount + colnames = [col_d[0] for col_d in cursor.description] + + assert colnames[1] == "count_me", \ + "unrecognized name '%s'" % colnames[1] + + r = cursor.fetchone() + assert r[1] == 0, \ + "unrecognized value '%s'" % r[1] + + cursor.execute("SELECT * FROM CounterCF") + assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount + colnames = [col_d[0] for col_d in cursor.description] + + assert colnames[1] == "count_me", \ + "unrecognized name '%s'" % colnames[1] + + r = cursor.fetchone() + assert r[1] == 0, \ + "unrecognized value '%s'" % r[1] + + # deleting a counter column + cursor.execute("DELETE count_me FROM CounterCF WHERE KEY = 'counter1'") + cursor.execute("SELECT * FROM CounterCF") + assert cursor.rowcount == 1, "expected 1 results, got %d" % cursor.rowcount + colnames = [col_d[0] for col_d in cursor.description] + assert len(colnames) == 1 + + r = cursor.fetchone() + assert len(r) == 1 + + # can't mix counter and normal statements + assert_raises(cql.ProgrammingError, + cursor.execute, + "UPDATE CounterCF SET count_me = count_me + 2, x = 'a' WHERE key = 'counter1'") + + # column names must match + assert_raises(cql.ProgrammingError, + cursor.execute, + "UPDATE CounterCF SET count_me = count_not_me + 2 WHERE key = 'counter1'") Modified: cassandra/trunk/test/system/test_thrift_server.py URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_thrift_server.py?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/test/system/test_thrift_server.py (original) +++ cassandra/trunk/test/system/test_thrift_server.py Sun Jun 5 13:43:22 2011 @@ -1421,7 +1421,7 @@ class TestMutations(ThriftTester): client.system_update_column_family(modified_cf) # Add a second indexed CF ... - birthdate_coldef = ColumnDef('birthdate', 'BytesType', IndexType.KEYS, 'birthdate_index') + birthdate_coldef = ColumnDef('birthdate', 'BytesType', IndexType.KEYS, 'birthdate2_index') age_coldef = ColumnDef('age', 'BytesType', IndexType.KEYS, 'age_index') cfdef = CfDef('Keyspace1', 'BlankCF2', column_metadata=[birthdate_coldef, age_coldef]) client.system_add_column_family(cfdef) Modified: cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/SchemaLoader.java Sun Jun 5 13:43:22 2011 @@ -21,6 +21,7 @@ package org.apache.cassandra; import java.nio.ByteBuffer; import java.util.*; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.commons.lang.NotImplementedException; import com.google.common.base.Charsets; @@ -249,7 +250,7 @@ public class SchemaLoader {{ ByteBuffer cName = ByteBuffer.wrap("birthdate".getBytes(Charsets.UTF_8)); IndexType keys = withIdxType ? IndexType.KEYS : null; - put(cName, new ColumnDefinition(cName, LongType.instance, keys, null)); + put(cName, new ColumnDefinition(cName, LongType.instance, keys, ByteBufferUtil.bytesToHex(cName))); }}); } private static CFMetaData jdbcCFMD(String ksName, String cfName, AbstractType comp) Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java (original) +++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Session.java Sun Jun 5 13:43:22 2011 @@ -18,6 +18,8 @@ package org.apache.cassandra.stress; import java.io.*; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -34,7 +36,7 @@ import org.apache.thrift.transport.TFram import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; -public class Session +public class Session implements Serializable { // command line options public static final Options availableOptions = new Options(); @@ -74,6 +76,7 @@ public class Session availableOptions.addOption("O", "strategy-properties", true, "Replication strategy properties in the following format :,:,..."); availableOptions.addOption("W", "no-replicate-on-write",false, "Set replicate_on_write to false for counters. Only counter add with CL=ONE will work"); availableOptions.addOption("V", "average-size-values", false, "Generate column values of average rather than specific size"); + availableOptions.addOption("T", "send-to", true, "Send this as a request to the stress daemon at specified address."); } private int numKeys = 1000 * 1000; @@ -95,7 +98,7 @@ public class Session private boolean replicateOnWrite = true; private boolean ignoreErrors = false; - private PrintStream out = System.out; + private final String outFileName; private IndexType indexType = null; private Stress.Operations operation = Stress.Operations.INSERT; @@ -110,6 +113,8 @@ public class Session protected int mean; protected float sigma; + public final InetAddress sendToDaemon; + public Session(String[] arguments) throws IllegalArgumentException { float STDev = 0.1f; @@ -181,17 +186,7 @@ public class Session if (cmd.hasOption("r")) random = true; - if (cmd.hasOption("f")) - { - try - { - out = new PrintStream(new FileOutputStream(cmd.getOptionValue("f"))); - } - catch (FileNotFoundException e) - { - System.out.println(e.getMessage()); - } - } + outFileName = (cmd.hasOption("f")) ? cmd.getOptionValue("f") : null; if (cmd.hasOption("p")) port = Integer.parseInt(cmd.getOptionValue("p")); @@ -264,6 +259,17 @@ public class Session replicateOnWrite = false; averageSizeValues = cmd.hasOption("V"); + + try + { + sendToDaemon = cmd.hasOption("send-to") + ? InetAddress.getByName(cmd.getOptionValue("send-to")) + : null; + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); + } } catch (ParseException e) { @@ -360,7 +366,14 @@ public class Session public PrintStream getOutputStream() { - return out; + try + { + return (outFileName == null) ? System.out : new PrintStream(new FileOutputStream(outFileName)); + } + catch (FileNotFoundException e) + { + throw new RuntimeException(e.getMessage(), e); + } } public int getProgressInterval() @@ -432,16 +445,16 @@ public class Session try { client.system_add_keyspace(keyspace); - out.println(String.format("Created keyspaces. Sleeping %ss for propagation.", nodes.length)); + System.out.println(String.format("Created keyspaces. Sleeping %ss for propagation.", nodes.length)); Thread.sleep(nodes.length * 1000); // seconds } catch (InvalidRequestException e) { - out.println("Unable to create stress keyspace: " + e.getWhy()); + System.err.println("Unable to create stress keyspace: " + e.getWhy()); } catch (Exception e) { - out.println(e.getMessage()); + System.err.println(e.getMessage()); } } Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Stress.java URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Stress.java?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Stress.java (original) +++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/Stress.java Sun Jun 5 13:43:22 2011 @@ -17,15 +17,12 @@ */ package org.apache.cassandra.stress; -import org.apache.cassandra.stress.operations.*; -import org.apache.cassandra.stress.util.Operation; -import org.apache.cassandra.thrift.Cassandra; import org.apache.commons.cli.Option; -import java.io.PrintStream; +import java.io.*; +import java.net.Socket; +import java.net.SocketException; import java.util.Random; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.SynchronousQueue; public final class Stress { @@ -36,17 +33,10 @@ public final class Stress public static Session session; public static Random randomizer = new Random(); - - /** - * Producer-Consumer model: 1 producer, N consumers - */ - private static final BlockingQueue operations = new SynchronousQueue(true); + private static volatile boolean stopped = false; public static void main(String[] arguments) throws Exception { - long latency, oldLatency; - int epoch, total, oldTotal, keyCount, oldKeyCount; - try { session = new Session(arguments); @@ -57,111 +47,49 @@ public final class Stress return; } - // creating keyspace and column families - if (session.getOperation() == Operations.INSERT || session.getOperation() == Operations.COUNTER_ADD) - { - session.createKeySpaces(); - } - - int threadCount = session.getThreads(); - Thread[] consumers = new Thread[threadCount]; - PrintStream out = session.getOutputStream(); - - out.println("total,interval_op_rate,interval_key_rate,avg_latency,elapsed_time"); + PrintStream outStream = session.getOutputStream(); - int itemsPerThread = session.getKeysPerThread(); - int modulo = session.getNumKeys() % threadCount; - - // creating required type of the threads for the test - for (int i = 0; i < threadCount; i++) + if (session.sendToDaemon != null) { - if (i == threadCount - 1) - itemsPerThread += modulo; // last one is going to handle N + modulo items - - consumers[i] = new Consumer(itemsPerThread); - } - - new Producer().start(); - - // starting worker threads - for (int i = 0; i < threadCount; i++) - { - consumers[i].start(); - } + Socket socket = new Socket(session.sendToDaemon, 2159); - // initialization of the values - boolean terminate = false; - latency = 0; - epoch = total = keyCount = 0; + ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream()); + BufferedReader inp = new BufferedReader(new InputStreamReader(socket.getInputStream())); - int interval = session.getProgressInterval(); - int epochIntervals = session.getProgressInterval() * 10; - long testStartTime = System.currentTimeMillis(); + Runtime.getRuntime().addShutdownHook(new ShutDown(socket, out)); - while (!terminate) - { - Thread.sleep(100); - - int alive = 0; - for (Thread thread : consumers) - if (thread.isAlive()) alive++; + out.writeObject(session); - if (alive == 0) - terminate = true; + String line; - epoch++; - - if (terminate || epoch > epochIntervals) + try { - epoch = 0; - - oldTotal = total; - oldLatency = latency; - oldKeyCount = keyCount; - - total = session.operations.get(); - keyCount = session.keys.get(); - latency = session.latency.get(); + while (!socket.isClosed() && (line = inp.readLine()) != null) + { + if (line.equals("END")) + { + out.writeInt(1); + break; + } - int opDelta = total - oldTotal; - int keyDelta = keyCount - oldKeyCount; - double latencyDelta = latency - oldLatency; + outStream.println(line); + } + } + catch (SocketException e) + { + if (!stopped) + e.printStackTrace(); + } - long currentTimeInSeconds = (System.currentTimeMillis() - testStartTime) / 1000; - String formattedDelta = (opDelta > 0) ? Double.toString(latencyDelta / (opDelta * 1000)) : "NaN"; + out.close(); + inp.close(); - out.println(String.format("%d,%d,%d,%s,%d", total, opDelta / interval, keyDelta / interval, formattedDelta, currentTimeInSeconds)); - } + socket.close(); } - } - - private static Operation createOperation(int index) - { - switch (session.getOperation()) + else { - case READ: - return new Reader(index); - - case COUNTER_GET: - return new CounterGetter(index); - - case INSERT: - return new Inserter(index); - - case COUNTER_ADD: - return new CounterAdder(index); - - case RANGE_SLICE: - return new RangeSlicer(index); - - case INDEXED_RANGE_SLICE: - return new IndexedRangeSlicer(index); - - case MULTI_GET: - return new MultiGetter(index); + new StressAction(session, outStream).run(); } - - throw new UnsupportedOperationException(); } /** @@ -180,56 +108,35 @@ public final class Stress } } - /** - * Produces exactly N items (awaits each to be consumed) - */ - private static class Producer extends Thread + private static class ShutDown extends Thread { - public void run() - { - for (int i = 0; i < session.getNumKeys(); i++) - { - try - { - operations.put(createOperation(i % session.getNumDifferentKeys())); - } - catch (InterruptedException e) - { - System.err.println("Producer error - " + e.getMessage()); - return; - } - } - } - } - - /** - * Each consumes exactly N items from queue - */ - private static class Consumer extends Thread - { - private final int items; + private final Socket socket; + private final ObjectOutputStream out; - public Consumer(int toConsume) + public ShutDown(Socket socket, ObjectOutputStream out) { - items = toConsume; + this.out = out; + this.socket = socket; } public void run() { - Cassandra.Client client = session.getClient(); - - for (int i = 0; i < items; i++) + try { - try - { - operations.take().run(client); // running job - } - catch (Exception e) + if (!socket.isClosed()) { - System.err.println(e.getMessage()); - System.exit(-1); + System.out.println("Control-C caught. Canceling running action and shutting down..."); + + out.writeInt(1); + out.close(); + + stopped = true; } } + catch (IOException e) + { + e.printStackTrace(); + } } } Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java (original) +++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterAdder.java Sun Jun 5 13:43:22 2011 @@ -17,6 +17,7 @@ */ package org.apache.cassandra.stress.operations; +import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.util.Operation; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.thrift.*; @@ -31,9 +32,9 @@ import java.util.Map; public class CounterAdder extends Operation { - public CounterAdder(int index) + public CounterAdder(Session client, int index) { - super(index); + super(client, index); } public void run(Cassandra.Client client) throws IOException Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java (original) +++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/CounterGetter.java Sun Jun 5 13:43:22 2011 @@ -17,6 +17,7 @@ */ package org.apache.cassandra.stress.operations; +import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.util.Operation; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.thrift.*; @@ -27,9 +28,9 @@ import java.util.List; public class CounterGetter extends Operation { - public CounterGetter(int index) + public CounterGetter(Session client, int index) { - super(index); + super(client, index); } public void run(Cassandra.Client client) throws IOException Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java (original) +++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/IndexedRangeSlicer.java Sun Jun 5 13:43:22 2011 @@ -17,6 +17,7 @@ */ package org.apache.cassandra.stress.operations; +import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.util.Operation; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; @@ -30,9 +31,9 @@ public class IndexedRangeSlicer extends { private static List values = null; - public IndexedRangeSlicer(int index) + public IndexedRangeSlicer(Session client, int index) { - super(index); + super(client, index); } public void run(Cassandra.Client client) throws IOException Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java (original) +++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Inserter.java Sun Jun 5 13:43:22 2011 @@ -17,6 +17,7 @@ */ package org.apache.cassandra.stress.operations; +import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.util.Operation; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.thrift.*; @@ -33,9 +34,9 @@ public class Inserter extends Operation { private static List values; - public Inserter(int index) + public Inserter(Session client, int index) { - super(index); + super(client, index); } public void run(Cassandra.Client client) throws IOException Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java (original) +++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/MultiGetter.java Sun Jun 5 13:43:22 2011 @@ -17,6 +17,7 @@ */ package org.apache.cassandra.stress.operations; +import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.util.Operation; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.thrift.*; @@ -31,9 +32,9 @@ import java.util.Map; public class MultiGetter extends Operation { - public MultiGetter(int index) + public MultiGetter(Session client, int index) { - super(index); + super(client, index); } public void run(Cassandra.Client client) throws IOException Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java (original) +++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/RangeSlicer.java Sun Jun 5 13:43:22 2011 @@ -17,6 +17,7 @@ */ package org.apache.cassandra.stress.operations; +import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.util.Operation; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.thrift.*; @@ -30,9 +31,9 @@ import java.util.List; public class RangeSlicer extends Operation { - public RangeSlicer(int index) + public RangeSlicer(Session client, int index) { - super(index); + super(client, index); } public void run(Cassandra.Client client) throws IOException Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java (original) +++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/operations/Reader.java Sun Jun 5 13:43:22 2011 @@ -17,6 +17,7 @@ */ package org.apache.cassandra.stress.operations; +import org.apache.cassandra.stress.Session; import org.apache.cassandra.stress.util.Operation; import org.apache.cassandra.db.ColumnFamilyType; import org.apache.cassandra.thrift.*; @@ -29,9 +30,9 @@ import static com.google.common.base.Cha public class Reader extends Operation { - public Reader(int index) + public Reader(Session client, int index) { - super(index); + super(client, index); } public void run(Cassandra.Client client) throws IOException Modified: cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/util/Operation.java URL: http://svn.apache.org/viewvc/cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/util/Operation.java?rev=1132406&r1=1132405&r2=1132406&view=diff ============================================================================== --- cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/util/Operation.java (original) +++ cassandra/trunk/tools/stress/src/org/apache/cassandra/stress/util/Operation.java Sun Jun 5 13:43:22 2011 @@ -46,6 +46,12 @@ public abstract class Operation session = Stress.session; } + public Operation(Session client, int idx) + { + index = idx; + session = client; + } + /** * Run operation * @param client Cassandra Thrift client connection @@ -101,18 +107,18 @@ public abstract class Operation * key generator using Gauss or Random algorithm * @return byte[] representation of the key string */ - protected static byte[] generateKey() + protected byte[] generateKey() { - return (Stress.session.useRandomGenerator()) ? generateRandomKey() : generateGaussKey(); + return (session.useRandomGenerator()) ? generateRandomKey() : generateGaussKey(); } /** * Random key generator * @return byte[] representation of the key string */ - private static byte[] generateRandomKey() + private byte[] generateRandomKey() { - String format = "%0" + Stress.session.getTotalKeysLength() + "d"; + String format = "%0" + session.getTotalKeysLength() + "d"; return String.format(format, Stress.randomizer.nextInt(Stress.session.getNumDifferentKeys() - 1)).getBytes(UTF_8); } @@ -120,9 +126,8 @@ public abstract class Operation * Gauss key generator * @return byte[] representation of the key string */ - private static byte[] generateGaussKey() + private byte[] generateGaussKey() { - Session session = Stress.session; String format = "%0" + session.getTotalKeysLength() + "d"; for (;;)