Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A889A18E3F for ; Tue, 15 Dec 2015 19:17:39 +0000 (UTC) Received: (qmail 59008 invoked by uid 500); 15 Dec 2015 19:17:39 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 58973 invoked by uid 500); 15 Dec 2015 19:17:39 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 58943 invoked by uid 99); 15 Dec 2015 19:17:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Dec 2015 19:17:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 03198DFF87; Tue, 15 Dec 2015 19:17:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jake@apache.org To: commits@cassandra.apache.org Date: Tue, 15 Dec 2015 19:17:38 -0000 Message-Id: <80d749dab92c4aa7abebfdae4e86680e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] cassandra git commit: stableloader will fail if there are collections in the schema tables Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 db40ef839 -> 942e5e599 stableloader will fail if there are collections in the schema tables Fix and new testcase Patch by tjake; reviewed by Aleksey Yeschenko for CASSANDRA-10700 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/994250c8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/994250c8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/994250c8 Branch: refs/heads/cassandra-3.0 Commit: 994250c8d38b3b4299f2e33ebe405ff601b5ae85 Parents: cee35e4 Author: T Jake Luciani Authored: Mon Dec 14 11:40:53 2015 -0500 Committer: T Jake Luciani Committed: Tue Dec 15 14:15:12 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 4 + .../org/apache/cassandra/tools/BulkLoader.java | 31 +++- .../apache/cassandra/tools/BulkLoaderTest.java | 172 +++++++++++++++++++ 3 files changed, 202 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/994250c8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7f1d66b..8e58703 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,8 +1,12 @@ 2.1.13 +<<<<<<< HEAD * Allow CREATE TABLE WITH ID (CASSANDRA-9179) * Make Stress compiles within eclipse (CASSANDRA-10807) * Cassandra Daemon should print JVM arguments (CASSANDRA-10764) * Allow cancellation of index summary redistribution (CASSANDRA-8805) +======= + * sstableloader will fail if there are collections in the schema tables (CASSANDRA-10700) +>>>>>>> 5377183... stableloader will fail if there are collections in the schema tables * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474) * Fix Stress profile parsing on Windows (CASSANDRA-10808) http://git-wip-us.apache.org/repos/asf/cassandra/blob/994250c8/src/java/org/apache/cassandra/tools/BulkLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java index f4b30cb..96e826d 100644 --- a/src/java/org/apache/cassandra/tools/BulkLoader.java +++ b/src/java/org/apache/cassandra/tools/BulkLoader.java @@ -24,9 +24,10 @@ import java.util.*; import com.google.common.base.Joiner; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; import org.apache.commons.cli.*; +import org.apache.commons.lang3.StringUtils; + import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TTransport; @@ -310,10 +311,11 @@ public class BulkLoader } } - String cfQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = '%s'", - Keyspace.SYSTEM_KS, - SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, - keyspace); + String cfQuery = String.format("SELECT %s FROM %s.%s WHERE keyspace_name = '%s'", + StringUtils.join(getCFColumnsWithoutCollections(), ","), + Keyspace.SYSTEM_KS, + SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, + keyspace); CqlResult cfRes = client.execute_cql3_query(ByteBufferUtil.bytes(cfQuery), Compression.NONE, ConsistencyLevel.ONE); @@ -340,6 +342,25 @@ public class BulkLoader } } + //Remove dropped_columns since we can't parse collections in v2 which is used by thrift + //See CASSANDRA-10700 + List getCFColumnsWithoutCollections() + { + + Iterator allColumns = CFMetaData.SchemaColumnFamiliesCf.allColumnsInSelectOrder(); + List selectedColumns = new ArrayList<>(); + + while (allColumns.hasNext()) + { + ColumnDefinition def = allColumns.next(); + + if (!def.type.isCollection()) + selectedColumns.add(UTF8Type.instance.getString(def.name.bytes)); + } + + return selectedColumns; + } + @Override public StreamConnectionFactory getConnectionFactory() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/994250c8/test/unit/org/apache/cassandra/tools/BulkLoaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/tools/BulkLoaderTest.java b/test/unit/org/apache/cassandra/tools/BulkLoaderTest.java new file mode 100644 index 0000000..dcdb7eb --- /dev/null +++ b/test/unit/org/apache/cassandra/tools/BulkLoaderTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools; + + +import java.io.File; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.io.Files; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.io.sstable.CQLSSTableWriter; +import org.apache.cassandra.io.sstable.SSTableLoader; +import org.apache.cassandra.service.EmbeddedCassandraService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.thrift.TFramedTransportFactory; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.OutputHandler; + +import static org.junit.Assert.assertEquals; + +public class BulkLoaderTest +{ + + static EmbeddedCassandraService embeddedCassandraService = new EmbeddedCassandraService(); + + @BeforeClass + public static void setup() throws Exception + { + SchemaLoader.cleanupAndLeaveDirs(); + embeddedCassandraService.start(); + + + QueryProcessor.executeInternal("CREATE KEYSPACE cql_keyspace WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"); + } + + + @Test + public void testClientWriter() throws Exception + { + String KS = "cql_keyspace"; + String TABLE = "table2"; + + File tempdir = Files.createTempDir(); + File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE); + assert dataDir.mkdirs(); + + String schema = "CREATE TABLE cql_keyspace.table2 (" + + " k int PRIMARY KEY," + + " v1 text," + + " v2 int" + + ")"; + + QueryProcessor.executeInternal(schema); + + String insert = "INSERT INTO cql_keyspace.table2 (k, v1, v2) VALUES (?, ?, ?)"; + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .forTable(schema) + .withPartitioner(StorageService.instance.getPartitioner()) + .using(insert).build(); + + writer.addRow(0, "test1", 24); + writer.addRow(1, "test2", null); + writer.addRow(2, "test3", 42); + writer.addRow(ImmutableMap.of("k", 3, "v2", 12)); + writer.close(); + + BulkLoader.ExternalClient client = new BulkLoader.ExternalClient(Sets.newHashSet(FBUtilities.getLocalAddress()), + DatabaseDescriptor.getRpcPort(), + null, null, new TFramedTransportFactory(), + DatabaseDescriptor.getStoragePort(), + DatabaseDescriptor.getSSLStoragePort(), null); + + + SSTableLoader loader = new SSTableLoader(dataDir, client, new OutputHandler.SystemOutput(false, false)); + + + + loader.stream().get(); + + UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table2;"); + assertEquals(4, rs.size()); + } + + + @Test + public void testClientWriterWithDroppedColumn() throws Exception + { + String KS = "cql_keyspace"; + String TABLE = "table3"; + + File tempdir = Files.createTempDir(); + File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE); + assert dataDir.mkdirs(); + + String schemaToDrop = "CREATE TABLE cql_keyspace.table3 (" + + " k int PRIMARY KEY," + + " v1 text," + + " v2 int," + + " v3 list," + + " v4 text" + + ")"; + + QueryProcessor.executeInternal(schemaToDrop); + QueryProcessor.executeInternal("ALTER TABLE cql_keyspace.table3 DROP v4"); + + + String schema = "CREATE TABLE cql_keyspace.table3 (" + + " k int PRIMARY KEY," + + " v1 text," + + " v2 int," + + " v3 list" + + ")"; + + + String insert = "INSERT INTO cql_keyspace.table3 (k, v1, v2, v3) VALUES (?, ?, ?, ?)"; + CQLSSTableWriter writer = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .forTable(schema) + .withPartitioner(StorageService.instance.getPartitioner()) + .using(insert).build(); + + writer.addRow(0, "test1", 24, Lists.newArrayList(4)); + writer.addRow(1, "test2", null, Lists.newArrayList(4,4,5)); + writer.addRow(2, "test3", 42, null); + writer.close(); + + BulkLoader.ExternalClient client = new BulkLoader.ExternalClient(Sets.newHashSet(FBUtilities.getLocalAddress()), + DatabaseDescriptor.getRpcPort(), + null, null, new TFramedTransportFactory(), + DatabaseDescriptor.getStoragePort(), + DatabaseDescriptor.getSSLStoragePort(), null); + + SSTableLoader loader = new SSTableLoader(dataDir, client, new OutputHandler.SystemOutput(false, false)); + + + loader.stream().get(); + + + CFMetaData cfMetaData = client.getCFMetaData(KS, TABLE); + assert cfMetaData != null; + + UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM cql_keyspace.table3;"); + assertEquals(3, rs.size()); + } + +}