cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [09/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.9
Date Thu, 04 Aug 2016 14:18:48 GMT
Merge branch 'cassandra-3.0' into cassandra-3.9


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2e0ace7b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2e0ace7b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2e0ace7b

Branch: refs/heads/trunk
Commit: 2e0ace7bca45a33e5b220660bebc6afbdbbd8e5c
Parents: 042e1f7 cc8f6cc
Author: Yuki Morishita <yukim@apache.org>
Authored: Wed Aug 3 20:39:30 2016 -0500
Committer: Yuki Morishita <yukim@apache.org>
Committed: Wed Aug 3 20:39:30 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamSession.java      |  3 +-
 .../cassandra/streaming/StreamTransferTask.java |  4 +-
 .../streaming/messages/FileMessageHeader.java   | 20 +++--
 .../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
 .../streaming/StreamTransferTaskTest.java       | 85 ++++++++++++++++++--
 6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 31d9434,49733d3..dd04ddf
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -18,45 -13,6 +18,46 @@@ Merged from 3.0
     to connect with too low of a protocol version (CASSANDRA-11464)
   * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
   * Fix problem with undeleteable rows on upgrade to new sstable format (CASSANDRA-12144)
 +Merged from 2.2:
++ * Release sstables of failed stream sessions only when outgoing transfers are finished
(CASSANDRA-11345)
 + * Wait for tracing events before returning response and query at same consistency level
client side (CASSANDRA-11465)
 + * cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)
 + * Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214)
 +Merged from 2.1:
 + * cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850)
 + * Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040)
 +
 +
 +3.8
 + * RTE from new CDC column breaks in flight queries (CASSANDRA-12236)
 + * Fix hdr logging for single operation workloads (CASSANDRA-12145)
 + * Fix SASI PREFIX search in CONTAINS mode with partial terms (CASSANDRA-12073)
 + * Increase size of flushExecutor thread pool (CASSANDRA-12071)
 + * Partial revert of CASSANDRA-11971, cannot recycle buffer in SP.sendMessagesToNonlocalDC
(CASSANDRA-11950)
 + * Upgrade netty to 4.0.39 (CASSANDRA-12032, CASSANDRA-12034)
 + * Improve details in compaction log message (CASSANDRA-12080)
 + * Allow unset values in CQLSSTableWriter (CASSANDRA-11911)
 + * Chunk cache to request compressor-compatible buffers if pool space is exhausted (CASSANDRA-11993)
 + * Remove DatabaseDescriptor dependencies from SequentialWriter (CASSANDRA-11579)
 + * Move skip_stop_words filter before stemming (CASSANDRA-12078)
 + * Support seek() in EncryptedFileSegmentInputStream (CASSANDRA-11957)
 + * SSTable tools mishandling LocalPartitioner (CASSANDRA-12002)
 + * When SEPWorker assigned work, set thread name to match pool (CASSANDRA-11966)
 + * Add cross-DC latency metrics (CASSANDRA-11596)
 + * Allow terms in selection clause (CASSANDRA-10783)
 + * Add bind variables to trace (CASSANDRA-11719)
 + * Switch counter shards' clock to timestamps (CASSANDRA-9811)
 + * Introduce HdrHistogram and response/service/wait separation to stress tool (CASSANDRA-11853)
 + * entry-weighers in QueryProcessor should respect partitionKeyBindIndexes field (CASSANDRA-11718)
 + * Support older ant versions (CASSANDRA-11807)
 + * Estimate compressed on disk size when deciding if sstable size limit reached (CASSANDRA-11623)
 + * cassandra-stress profiles should support case sensitive schemas (CASSANDRA-11546)
 + * Remove DatabaseDescriptor dependency from FileUtils (CASSANDRA-11578)
 + * Faster streaming (CASSANDRA-9766)
 + * Add prepared query parameter to trace for "Execute CQL3 prepared query" session (CASSANDRA-11425)
 + * Add repaired percentage metric (CASSANDRA-11503)
 + * Add Change-Data-Capture (CASSANDRA-8844)
 +Merged from 3.0:
   * Fix paging logic for deleted partitions with static columns (CASSANDRA-12107)
   * Wait until the message is being send to decide which serializer must be used (CASSANDRA-11393)
   * Fix migration of static thrift column names with non-text comparators (CASSANDRA-12147)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index e8d0cae,c1c5055..4f313c3
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@@ -22,7 -22,9 +22,8 @@@ import java.util.concurrent.*
  import java.util.concurrent.ScheduledFuture;
  import java.util.concurrent.atomic.AtomicInteger;
  
+ import com.google.common.annotations.VisibleForTesting;
  import com.google.common.base.Throwables;
 -import com.google.common.collect.Iterables;
  
  import org.apache.cassandra.concurrent.NamedThreadFactory;
  import org.apache.cassandra.io.sstable.format.SSTableReader;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2e0ace7b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 9572552,dce56eb..fe75da1
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@@ -103,4 -114,68 +114,68 @@@ public class StreamTransferTaskTes
          // when all streaming are done, time out task should not be scheduled.
          assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
      }
+ 
+     @Test
+     public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
+     {
+         InetAddress peer = FBUtilities.getBroadcastAddress();
 -        StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null);
++        StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null,
false);
+         StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(),
streamCoordinator);
+         StreamSession session = new StreamSession(peer, peer, null, 0, true, false);
+         session.init(future);
+         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+ 
+         // create two sstables
+         for (int i = 0; i < 2; i++)
+         {
+             SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+             cfs.forceBlockingFlush();
+         }
+ 
+         // create streaming task that streams those two sstables
+         StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+         List<Ref<SSTableReader>> refs = new ArrayList<>(cfs.getSSTables().size());
+         for (SSTableReader sstable : cfs.getLiveSSTables())
+         {
+             List<Range<Token>> ranges = new ArrayList<>();
+             ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+             Ref<SSTableReader> ref = sstable.selfRef();
+             refs.add(ref);
+             task.addTransferFile(ref, 1, sstable.getPositionsForRanges(ranges), 0);
+         }
+         assertEquals(2, task.getTotalNumberOfFiles());
+ 
+         //add task to stream session, so it is aborted when stream session fails
+         session.transfers.put(UUID.randomUUID(), task);
+ 
+         //make a copy of outgoing file messages, since task is cleared when it's aborted
+         Collection<OutgoingFileMessage> files = new LinkedList<>(task.files.values());
+ 
+         //simulate start transfer
+         for (OutgoingFileMessage file : files)
+         {
+             file.startTransfer();
+         }
+ 
+         //fail stream session mid-transfer
+         session.onError(new Exception("Fake exception"));
+ 
+         //make sure reference was not released
+         for (Ref<SSTableReader> ref : refs)
+         {
+             assertEquals(1, ref.globalCount());
+         }
+ 
+         //simulate finish transfer
+         for (OutgoingFileMessage file : files)
+         {
+             file.finishTransfer();
+         }
+ 
+         //now reference should be released
+         for (Ref<SSTableReader> ref : refs)
+         {
+             assertEquals(0, ref.globalCount());
+         }
+     }
  }


Mime
View raw message