cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [3/3] git commit: merge from 1.2
Date Fri, 21 Jun 2013 14:27:53 GMT
merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/140b0311
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/140b0311
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/140b0311

Branch: refs/heads/trunk
Commit: 140b0311df890d2258f19f3df98f1a996b6f2a6e
Parents: b73f9d4 fbe8a6e
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Fri Jun 21 09:27:21 2013 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Fri Jun 21 09:27:21 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../DebuggableThreadPoolExecutor.java           | 22 ++++++++---
 .../cassandra/concurrent/StageManager.java      | 29 +++++++++------
 .../concurrent/TracingAwareExecutorService.java | 33 +++++++++++++++++
 .../apache/cassandra/net/MessagingService.java  | 14 ++++---
 .../cassandra/net/OutboundTcpConnection.java    | 13 ++++++-
 .../cassandra/service/MigrationManager.java     |  3 +-
 .../cassandra/tracing/ExpiredTraceState.java    | 39 ++++++++++++++++++++
 .../apache/cassandra/tracing/TraceState.java    | 12 ++++--
 .../org/apache/cassandra/tracing/Tracing.java   | 26 +++++++------
 .../service/AntiEntropyServiceTestAbstract.java |  9 +++--
 11 files changed, 156 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/140b0311/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 044add4,593bf7c..29c6fd2
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,69 -1,5 +1,70 @@@
 +2.0
 + * Removed on-heap row cache (CASSANDRA-5348)
 + * use nanotime consistently for node-local timeouts (CASSANDRA-5581)
 + * Avoid unnecessary second pass on name-based queries (CASSANDRA-5577)
 + * Experimental triggers (CASSANDRA-1311)
 + * JEMalloc support for off-heap allocation (CASSANDRA-3997)
 + * Single-pass compaction (CASSANDRA-4180)
 + * Removed token range bisection (CASSANDRA-5518)
 + * Removed compatibility with pre-1.2.5 sstables and network messages
 +   (CASSANDRA-5511)
 + * removed PBSPredictor (CASSANDRA-5455)
 + * CAS support (CASSANDRA-5062, 5441, 5442, 5443, 5619)
 + * Leveled compaction performs size-tiered compactions in L0 
 +   (CASSANDRA-5371, 5439)
 + * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339)
 + * Log when a node is down longer than the hint window (CASSANDRA-4554)
 + * Optimize tombstone creation for ExpiringColumns (CASSANDRA-4917)
 + * Improve LeveledScanner work estimation (CASSANDRA-5250, 5407)
 + * Replace compaction lock with runWithCompactionsDisabled (CASSANDRA-3430)
 + * Change Message IDs to ints (CASSANDRA-5307)
 + * Move sstable level information into the Stats component, removing the
 +   need for a separate Manifest file (CASSANDRA-4872)
 + * avoid serializing to byte[] on commitlog append (CASSANDRA-5199)
 + * make index_interval configurable per columnfamily (CASSANDRA-3961, CASSANDRA-5650)
 + * add default_time_to_live (CASSANDRA-3974)
 + * add memtable_flush_period_in_ms (CASSANDRA-4237)
 + * replace supercolumns internally by composites (CASSANDRA-3237, 5123)
 + * upgrade thrift to 0.9.0 (CASSANDRA-3719)
 + * drop unnecessary keyspace parameter from user-defined compaction API 
 +   (CASSANDRA-5139)
 + * more robust solution to incomplete compactions + counters (CASSANDRA-5151)
 + * Change order of directory searching for c*.in.sh (CASSANDRA-3983)
 + * Add tool to reset SSTable compaction level for LCS (CASSANDRA-5271)
 + * Allow custom configuration loader (CASSANDRA-5045)
 + * Remove memory emergency pressure valve logic (CASSANDRA-3534)
 + * Reduce request latency with eager retry (CASSANDRA-4705)
 + * cqlsh: Remove ASSUME command (CASSANDRA-5331)
 + * Rebuild BF when loading sstables if bloom_filter_fp_chance
 +   has changed since compaction (CASSANDRA-5015)
 + * remove row-level bloom filters (CASSANDRA-4885)
 + * Change Kernel Page Cache skipping into row preheating (disabled by default)
 +   (CASSANDRA-4937)
 + * Improve repair by deciding on a gcBefore before sending
 +   out TreeRequests (CASSANDRA-4932)
 + * Add an official way to disable compactions (CASSANDRA-5074)
 + * Reenable ALTER TABLE DROP with new semantics (CASSANDRA-3919)
 + * Add binary protocol versioning (CASSANDRA-5436)
 + * Swap THshaServer for TThreadedSelectorServer (CASSANDRA-5530)
 + * Add alias support to SELECT statement (CASSANDRA-5075)
 + * Don't create empty RowMutations in CommitLogReplayer (CASSANDRA-5541)
 + * Use range tombstones when dropping cfs/columns from schema (CASSANDRA-5579)
 + * cqlsh: drop CQL2/CQL3-beta support (CASSANDRA-5585)
 + * Track max/min column names in sstables to be able to optimize slice
 +   queries (CASSANDRA-5514, CASSANDRA-5595, CASSANDRA-5600)
 + * Binary protocol: allow batching already prepared statements (CASSANDRA-4693)
 + * Allow preparing timestamp, ttl and limit in CQL3 queries (CASSANDRA-4450)
 + * Support native link w/o JNA in Java7 (CASSANDRA-3734)
 + * Use SASL authentication in binary protocol v2 (CASSANDRA-5545)
 + * Replace Thrift HsHa with LMAX Disruptor based implementation (CASSANDRA-5582)
 + * cqlsh: Add row count to SELECT output (CASSANDRA-5636)
 + * Include a timestamp with all read commands to determine column expiration
 +   (CASSANDRA-5149)
 + * Streaming 2.0 (CASSANDRA-5286)
 + * Conditional create/drop ks/table/index statements in CQL3 (CASSANDRA-2737)
 +
  1.2.6
+  * Fix tracing when operation completes before all responses arrive (CASSANDRA-5668)
   * Fix cross-DC mutation forwarding (CASSANDRA-5632)
   * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
   * (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/140b0311/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index 8e94f8e,2964d35..e01183c
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -55,12 -60,11 +56,12 @@@ import org.apache.cassandra.metrics.Dro
  import org.apache.cassandra.net.sink.SinkManager;
  import org.apache.cassandra.security.SSLFactory;
  import org.apache.cassandra.service.*;
 +import org.apache.cassandra.service.paxos.Commit;
 +import org.apache.cassandra.service.paxos.PrepareResponse;
  import org.apache.cassandra.streaming.*;
 -import org.apache.cassandra.streaming.compress.CompressedFileStreamTask;
+ import org.apache.cassandra.tracing.TraceState;
  import org.apache.cassandra.tracing.Tracing;
  import org.apache.cassandra.utils.*;
- import org.cliffc.high_scale_lib.NonBlockingHashMap;
  
  public final class MessagingService implements MessagingServiceMBean
  {
@@@ -677,23 -701,38 +678,24 @@@
          }
      }
  
 -    public void receive(MessageIn message, String id, long timestamp)
 +    public void receive(MessageIn message, int id, long timestamp)
      {
-         Tracing.instance().initializeFromMessage(message);
-         Tracing.trace("Message received from {}", message.from);
+         TraceState state = Tracing.instance().initializeFromMessage(message);
+         if (state != null)
+             state.trace("Message received from {}", message.from);
  
          message = SinkManager.processInboundMessage(message, id);
          if (message == null)
              return;
  
          Runnable runnable = new MessageDeliveryTask(message, id, timestamp);
-         ExecutorService stage = StageManager.getStage(message.getMessageType());
+         TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType());
          assert stage != null : "No stage for message type " + message.verb;
  
-         stage.execute(runnable);
 -        if (message.verb == Verb.REQUEST_RESPONSE && PBSPredictor.instance().isLoggingEnabled())
 -        {
 -            IMessageCallback cb = MessagingService.instance().getRegisteredCallback(id).callback;
 -
 -            if (cb instanceof AbstractWriteResponseHandler)
 -            {
 -                PBSPredictor.instance().logWriteResponse(id, timestamp);
 -            }
 -            else if (cb instanceof ReadCallback)
 -            {
 -                PBSPredictor.instance().logReadResponse(id, timestamp);
 -            }
 -        }
 -
+         stage.execute(runnable, state);
      }
  
 -    public void setCallbackForTests(String messageId, CallbackInfo callback)
 +    public void setCallbackForTests(int messageId, CallbackInfo callback)
      {
          callbacks.put(messageId, callback);
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/140b0311/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 52a415c,ee30d36..eaab3ad
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@@ -182,15 -179,25 +182,24 @@@ public class OutboundTcpConnection exte
              {
                  UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
                  TraceState state = Tracing.instance().get(sessionId);
-                 state.trace("Sending message to {}", poolReference.endPoint());
-                 Tracing.instance().stopIfNonLocal(state);
+                 String message = String.format("Sending message to %s", poolReference.endPoint());
+                 // session may have already finished; see CASSANDRA-5668
+                 if (state == null)
+                 {
+                     TraceState.trace(ByteBuffer.wrap(sessionBytes), message, -1);
+                 }
+                 else
+                 {
+                     state.trace(message);
+                     Tracing.instance().stopIfNonLocal(state);
+                 }
              }
  
 -            write(qm.message, qm.id, qm.timestamp, out, targetVersion);
 +            writeInternal(qm.message, qm.id, qm.timestamp);
 +
              completed++;
              if (active.peek() == null)
 -            {
                  out.flush();
 -            }
          }
          catch (Exception e)
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/140b0311/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/MigrationManager.java
index d602d25,de34785..cce674f
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@@ -23,9 -23,7 +23,10 @@@ import java.io.IOException
  import java.net.InetAddress;
  import java.nio.ByteBuffer;
  import java.util.*;
 +import java.util.concurrent.CopyOnWriteArrayList;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.*;
  
  import java.lang.management.ManagementFactory;
  import java.lang.management.RuntimeMXBean;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/140b0311/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tracing/TraceState.java
index 52e6d04,25599c4..b4cff93
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@@ -91,12 -96,14 +94,13 @@@ public class TraceStat
              public void runMayThrow() throws Exception
              {
                  CFMetaData cfMeta = CFMetaData.TraceEventsCf;
 -                ColumnFamily cf = ColumnFamily.create(cfMeta);
 +                ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfMeta);
 +                Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("activity")),
message);
                  Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source")),
FBUtilities.getBroadcastAddress());
-                 Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")),
elapsed);
 -                Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("thread")),
threadName);
+                 if (elapsed >= 0)
+                     Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")),
elapsed);
 -                Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("activity")),
message);
 -                RowMutation mutation = new RowMutation(Tracing.TRACE_KS, sessionIdBytes);
 -                mutation.add(cf);
 +                Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("thread")),
threadName);
 +                RowMutation mutation = new RowMutation(Tracing.TRACE_KS, sessionIdBytes,
cf);
                  StorageProxy.mutate(Arrays.asList(mutation), ConsistencyLevel.ANY);
              }
          });

http://git-wip-us.apache.org/repos/asf/cassandra/blob/140b0311/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tracing/Tracing.java
index 5637f48,eb5bad9..8782ee5
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@@ -33,9 -33,13 +33,10 @@@ import org.apache.cassandra.concurrent.
  import org.apache.cassandra.concurrent.StageManager;
  import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.cql3.ColumnNameBuilder;
 -import org.apache.cassandra.db.ColumnFamily;
 -import org.apache.cassandra.db.ConsistencyLevel;
 -import org.apache.cassandra.db.ExpiringColumn;
 -import org.apache.cassandra.db.RowMutation;
 +import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.marshal.TimeUUIDType;
  import org.apache.cassandra.net.MessageIn;
+ import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.service.StorageProxy;
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.FBUtilities;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/140b0311/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index 79aed42,b80c272..c930cc3
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@@ -48,6 -48,7 +48,7 @@@ import org.apache.cassandra.utils.ByteB
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.MerkleTree;
  
 -import static org.apache.cassandra.service.AntiEntropyService.*;
++import static org.apache.cassandra.service.ActiveRepairService.*;
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertTrue;
  


Mime
View raw message