cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [1/3] git commit: Merge branch 'cassandra-1.1' into trunk
Date Wed, 31 Oct 2012 16:11:33 GMT
Updated Branches:
  refs/heads/cassandra-1.1 99b245d31 -> d401d9f1d
  refs/heads/trunk c320b82be -> c631cc7c3


Merge branch 'cassandra-1.1' into trunk

Conflicts:
	src/java/org/apache/cassandra/streaming/IncomingStreamReader.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c631cc7c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c631cc7c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c631cc7c

Branch: refs/heads/trunk
Commit: c631cc7c3a9f75d9d408038d88532a59a3b51c5e
Parents: c320b82 d401d9f
Author: Yuki Morishita <yukim@apache.org>
Authored: Wed Oct 31 11:10:54 2012 -0500
Committer: Yuki Morishita <yukim@apache.org>
Committed: Wed Oct 31 11:10:54 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../cassandra/streaming/IncomingStreamReader.java  |   16 +++++++++++++++
 .../cassandra/streaming/StreamInSession.java       |    6 +++++
 3 files changed, 23 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c631cc7c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index fed3bc1,5f5ea89..11aaea1
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -58,77 -11,9 +58,78 @@@ Merged from 1.1
   * fix compositeType.{get/from}String methods (CASSANDRA-4842)
   * (CQL) fix CREATE COLUMNFAMILY permissions check (CASSANDRA-4864)
   * Fix DynamicCompositeType same type comparison (CASSANDRA-4711)
+  * Fix duplicate SSTable reference when stream session failed (CASSANDRA-3306)
  
  
 +
 +1.2-beta1
 + * add atomic_batch_mutate (CASSANDRA-4542, -4635)
 + * increase default max_hint_window_in_ms to 3h (CASSANDRA-4632)
 + * include message initiation time to replicas so they can more
 +   accurately drop timed-out requests (CASSANDRA-2858)
 + * fix clientutil.jar dependencies (CASSANDRA-4566)
 + * optimize WriteResponse (CASSANDRA-4548)
 + * new metrics (CASSANDRA-4009)
 + * redesign KEYS indexes to avoid read-before-write (CASSANDRA-2897)
 + * debug tracing (CASSANDRA-1123)
 + * parallelize row cache loading (CASSANDRA-4282)
 + * Make compaction, flush JBOD-aware (CASSANDRA-4292)
 + * run local range scans on the read stage (CASSANDRA-3687)
 + * clean up ioexceptions (CASSANDRA-2116)
 + * add disk_failure_policy (CASSANDRA-2118)
 + * Introduce new json format with row level deletion (CASSANDRA-4054)
 + * remove redundant "name" column from schema_keyspaces (CASSANDRA-4433)
 + * improve "nodetool ring" handling of multi-dc clusters (CASSANDRA-3047)
 + * update NTS calculateNaturalEndpoints to be O(N log N) (CASSANDRA-3881)
 + * add UseCondCardMark XX jvm settings on jdk 1.7 (CASSANDRA-4366)
 + * split up rpc timeout by operation type (CASSANDRA-2819)
 + * rewrite key cache save/load to use only sequential i/o (CASSANDRA-3762)
 + * update MS protocol with a version handshake + broadcast address id
 +   (CASSANDRA-4311)
 + * multithreaded hint replay (CASSANDRA-4189)
 + * add inter-node message compression (CASSANDRA-3127)
 + * remove COPP (CASSANDRA-2479)
 + * Track tombstone expiration and compact when tombstone content is
 +   higher than a configurable threshold, default 20% (CASSANDRA-3442, 4234)
 + * update MurmurHash to version 3 (CASSANDRA-2975)
 + * (CLI) track elapsed time for `delete' operation (CASSANDRA-4060)
 + * (CLI) jline version is bumped to 1.0 to properly  support
 +   'delete' key function (CASSANDRA-4132)
 + * Save IndexSummary into new SSTable 'Summary' component (CASSANDRA-2392, 4289)
 + * Add support for range tombstones (CASSANDRA-3708)
 + * Improve MessagingService efficiency (CASSANDRA-3617)
 + * Avoid ID conflicts from concurrent schema changes (CASSANDRA-3794)
 + * Set thrift HSHA server thread limit to unlimited by default (CASSANDRA-4277)
 + * Avoids double serialization of CF id in RowMutation messages
 +   (CASSANDRA-4293)
 + * stream compressed sstables directly with java nio (CASSANDRA-4297)
 + * Support multiple ranges in SliceQueryFilter (CASSANDRA-3885)
 + * Add column metadata to system column families (CASSANDRA-4018)
 + * (cql3) Always use composite types by default (CASSANDRA-4329)
 + * (cql3) Add support for set, map and list (CASSANDRA-3647)
 + * Validate date type correctly (CASSANDRA-4441)
 + * (cql3) Allow definitions with only a PK (CASSANDRA-4361)
 + * (cql3) Add support for row key composites (CASSANDRA-4179)
 + * improve DynamicEndpointSnitch by using reservoir sampling (CASSANDRA-4038)
 + * (cql3) Add support for 2ndary indexes (CASSANDRA-3680)
 + * (cql3) fix defining more than one PK to be invalid (CASSANDRA-4477)
 + * remove schema agreement checking from all external APIs (Thrift, CQL and CQL3) (CASSANDRA-4487)
 + * add Murmur3Partitioner and make it default for new installations (CASSANDRA-3772, 4621)
 + * (cql3) update pseudo-map syntax to use map syntax (CASSANDRA-4497)
 + * Finer grained exceptions hierarchy and provides error code with exceptions (CASSANDRA-3979)
 + * Adds events push to binary protocol (CASSANDRA-4480)
 + * Rewrite nodetool help (CASSANDRA-2293)
 + * Make CQL3 the default for CQL (CASSANDRA-4640)
 + * update stress tool to be able to use CQL3 (CASSANDRA-4406)
 + * Accept all thrift update on CQL3 cf but don't expose their metadata (CASSANDRA-4377)
 + * Replace Throttle with Guava's RateLimiter for HintedHandOff (CASSANDRA-4541)
 + * fix counter add/get using CQL2 and CQL3 in stress tool (CASSANDRA-4633)
 + * Add sstable count per level to cfstats (CASSANDRA-4537)
 + * (cql3) Add ALTER KEYSPACE statement (CASSANDRA-4611)
 + * (cql3) Allow defining default consistency levels (CASSANDRA-4448)
 + * (cql3) Fix queries using LIMIT missing results (CASSANDRA-4579)
 + * fix cross-version gossip messaging (CASSANDRA-4576)
 +
  1.1.6
   * Wait for writes on synchronous read digest mismatch (CASSANDRA-4792)
   * fix commitlog replay for nanotime-infected sstables (CASSANDRA-4782)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c631cc7c/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
index 1c3fc4e,bfb046f..0859eaa
--- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
@@@ -37,14 -34,19 +37,16 @@@ import org.apache.cassandra.db.compacti
  import org.apache.cassandra.io.IColumnSerializer;
  import org.apache.cassandra.io.sstable.*;
  import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.metrics.StreamingMetrics;
++import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.net.OutboundTcpConnection;
  import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.streaming.compress.CompressedInputStream;
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.BytesReadTracker;
 -import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.Pair;
 -
  import com.ning.compress.lzf.LZFInputStream;
  
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
 -
  public class IncomingStreamReader
  {
      private static final Logger logger = LoggerFactory.getLogger(IncomingStreamReader.class);
@@@ -58,8 -59,19 +60,22 @@@
      public IncomingStreamReader(StreamHeader header, Socket socket) throws IOException
      {
          socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout());
 -        this.socket = socket;
          InetAddress host = header.broadcastAddress != null ? header.broadcastAddress
                             : ((InetSocketAddress)socket.getRemoteSocketAddress()).getAddress();
+         if (header.pendingFiles.isEmpty() && header.file != null)
+         {
+             // StreamInSession should be created already when receiving 2nd and after files
+             if (!StreamInSession.hasSession(host, header.sessionId))
+             {
+                 StreamReply reply = new StreamReply("", header.sessionId, StreamReply.Status.SESSION_FAILURE);
 -                OutboundTcpConnection.write(reply.getMessage(Gossiper.instance.getVersion(host)),
Long.toString(header.sessionId), new DataOutputStream(socket.getOutputStream()));
++                OutboundTcpConnection.write(reply.createMessage(),
++                                            Long.toString(header.sessionId),
++                                            System.currentTimeMillis(),
++                                            new DataOutputStream(socket.getOutputStream()),
++                                            MessagingService.instance().getVersion(host));
+                 throw new IOException("Session " + header.sessionId + " already closed.");
+             }
+         }
          session = StreamInSession.get(host, header.sessionId);
          session.setSocket(socket);
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c631cc7c/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamInSession.java
index 5753a15,e1bbe41..f8bde91
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@@ -101,6 -102,12 +101,12 @@@ public class StreamInSession extends Ab
          return session;
      }
  
+     public static boolean hasSession(InetAddress host, long sessionId)
+     {
 -        Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host,
sessionId);
++        Pair<InetAddress, Long> context = Pair.create(host, sessionId);
+         return sessions.get(context) != null;
+     }
+ 
      public void setCurrentFile(PendingFile file)
      {
          this.current = file;


Mime
View raw message